//nolint
/*
Copyright (c) 2013-2016 Errplane Inc.
This code is originally from: https://github.com/influxdata/influxdb/blob/1.7/services/meta/client.go

2022.01.23 change http to rpc
Add TagKeys,FieldKeys etc.
Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.
*/

package metaclient

import (
	"bytes"
	crand "crypto/rand"
	"crypto/sha256"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"math"
	"net"
	"net/http"
	"net/url"
	"os"
	"path"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"

	set "github.com/deckarep/golang-set"
	"github.com/influxdata/influxdb/models"
	originql "github.com/influxdata/influxql"
	"github.com/openGemini/openGemini/app/ts-meta/meta/message"
	"github.com/openGemini/openGemini/engine/executor/spdy"
	"github.com/openGemini/openGemini/engine/executor/spdy/transport"
	"github.com/openGemini/openGemini/engine/op"
	"github.com/openGemini/openGemini/lib/config"
	"github.com/openGemini/openGemini/lib/errno"
	"github.com/openGemini/openGemini/lib/logger"
	"github.com/openGemini/openGemini/lib/obs"
	"github.com/openGemini/openGemini/lib/rand"
	"github.com/openGemini/openGemini/lib/statisticsPusher/statistics"
	"github.com/openGemini/openGemini/lib/sysinfo"
	"github.com/openGemini/openGemini/lib/util"
	"github.com/openGemini/openGemini/lib/util/lifted/hashicorp/serf/serf"
	"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
	meta2 "github.com/openGemini/openGemini/lib/util/lifted/influx/meta"
	proto2 "github.com/openGemini/openGemini/lib/util/lifted/influx/meta/proto"
	"github.com/openGemini/openGemini/lib/util/lifted/influx/query"
	"github.com/openGemini/openGemini/lib/util/lifted/protobuf/proto"
	"github.com/openGemini/openGemini/lib/util/lifted/vm/protoparser/influx"
	"go.uber.org/zap"
	"golang.org/x/crypto/pbkdf2"
	"golang.org/x/text/encoding/unicode"
	"golang.org/x/text/transform"
)

const (
	// errSleep is the time to sleep after we've failed on every metaserver
	// before making another pass
	errSleep = time.Second

	// SaltBytes is the number of bytes used for salts.
	SaltBytes = 32

	// pbkdf2 iter times
	pbkdf2Iter4096 = 4096
	pbkdf2Iter1000 = 1000

	// the length of key generated by pbkdf2
	pbkdf2KeyLen = 32

	// hash algorithm version flag of password save
	hashAlgoVerOne = "#Ver:001#"
	// PBKDF2 iterator 4096 times
	hashAlgoVerTwo = "#Ver:002#"
	// PBKDF2 iterator 1000 times
	hashAlgoVerThree = "#Ver:003#"

	maxDbOrRpName = 256

	// ShardGroupDeletedExpiration is the amount of time before a shard group info will be removed from cached
	// data after it has been marked deleted (2 weeks).
	ShardGroupDeletedExpiration = -2 * 7 * 24 * time.Hour
	IndexGroupDeletedExpiration = -2 * 7 * 24 * time.Hour

	RPCReqTimeout       = 10 * time.Second
	HttpSnapshotTimeout = 4 * time.Second

	//for lock user
	maxLoginLimit      = 5    //Maximum number of login attempts
	authFailCacheLimit = 200  //Size of the channel for processing authentication failures.
	lockUserTime       = 30   //User Lock Duration, in seconds.
	maxLoginValidTime  = 3600 //Validity duration of login records, in seconds.

	minUsernameLen = 4   // Minimum username length
	maxUsernameLen = 100 // Maximum username length

	minPasswordLen = 8   // Minimum password length
	maxPasswordLen = 256 // Maximum password length
)

type Role int

const (
	SQL Role = iota
	STORE
	META

	algoVer01 = 1
	algoVer02 = 2
	algoVer03 = 3
)

var (
	ErrNameTooLong          = fmt.Errorf("database name must have fewer than %d characters", maxDbOrRpName)
	RetryGetUserInfoTimeout = 5 * time.Second
	RetryExecTimeout        = 60 * time.Second
	RetryReportTimeout      = 60 * time.Second
	HttpReqTimeout          = 10 * time.Second
)

var DefaultTypeMapper = influxql.MultiTypeMapper(
	op.TypeMapper{},
	query.MathTypeMapper{},
	query.FunctionTypeMapper{},
	query.StringFunctionTypeMapper{},
)

var DefaultMetaClient *Client
var cliOnce sync.Once

type StorageNodeInfo struct {
	InsertAddr string
	SelectAddr string
}

type FieldKey struct {
	Field     string
	FieldType int32
}

type FieldKeys []FieldKey

func (a FieldKeys) Len() int           { return len(a) }
func (a FieldKeys) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a FieldKeys) Less(i, j int) bool { return a[i].Field < a[j].Field }

// MetaClient is an interface for accessing meta data.
type MetaClient interface {
	CreateMeasurement(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo, indexR *influxql.IndexRelation, engineType config.EngineType,
		colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error)
	AlterShardKey(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo) error
	CreateDatabase(name string, enableTagArray bool, replicaN uint32, options *obs.ObsOptions) (*meta2.DatabaseInfo, error)
	CreateDatabaseWithRetentionPolicy(name string, spec *meta2.RetentionPolicySpec, shardKey *meta2.ShardKeyInfo, enableTagArray bool, replicaN uint32) (*meta2.DatabaseInfo, error)
	CreateRetentionPolicy(database string, spec *meta2.RetentionPolicySpec, makeDefault bool) (*meta2.RetentionPolicyInfo, error)
	CreateSubscription(database, rp, name, mode string, destinations []string) error
	CreateUser(name, password string, admin, rwuser bool) (meta2.User, error)
	Databases() map[string]*meta2.DatabaseInfo
	Database(name string) (*meta2.DatabaseInfo, error)
	DataNode(id uint64) (*meta2.DataNode, error)
	DataNodes() ([]meta2.DataNode, error)
	AliveReadNodes() ([]meta2.DataNode, error)
	DeleteDataNode(id uint64) error
	DeleteMetaNode(id uint64) error
	DropShard(id uint64) error
	DropSubscription(database, rp, name string) error
	DropUser(name string) error
	MetaNodes() ([]meta2.NodeInfo, error)
	RetentionPolicy(database, name string) (rpi *meta2.RetentionPolicyInfo, err error)
	SetAdminPrivilege(username string, admin bool) error
	SetPrivilege(username, database string, p originql.Privilege) error
	ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta2.ShardInfo, err error)
	ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta2.ShardGroupInfo, err error)
	UpdateRetentionPolicy(database, name string, rpu *meta2.RetentionPolicyUpdate, makeDefault bool) error
	UpdateUser(name, password string) error
	UserPrivilege(username, database string) (*originql.Privilege, error)
	UserPrivileges(username string) (map[string]originql.Privilege, error)
	Users() []meta2.UserInfo
	MarkDatabaseDelete(name string) error
	MarkRetentionPolicyDelete(database, name string) error
	MarkMeasurementDelete(database, mst string) error
	DBPtView(database string) (meta2.DBPtInfos, error)
	ShardOwner(shardID uint64) (database, policy string, sgi *meta2.ShardGroupInfo)
	Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error)
	Schema(database string, retentionPolicy string, mst string) (fields map[string]int32, dimensions map[string]struct{}, err error)
	GetMeasurements(m *influxql.Measurement) ([]*meta2.MeasurementInfo, error)
	TagKeys(database string) map[string]set.Set
	FieldKeys(database string, ms influxql.Measurements) (map[string]map[string]int32, error)
	QueryTagKeys(database string, ms influxql.Measurements, cond influxql.Expr) (map[string]map[string]struct{}, error)
	MatchMeasurements(database string, ms influxql.Measurements) (map[string]*meta2.MeasurementInfo, error)
	Measurements(database string, ms influxql.Measurements) ([]string, error)
	ShowShards() models.Rows
	ShowShardGroups() models.Rows
	ShowSubscriptions() models.Rows
	ShowRetentionPolicies(database string) (models.Rows, error)
	ShowCluster() models.Rows
	ShowClusterWithCondition(nodeType string, ID uint64) (models.Rows, error)
	GetAliveShards(database string, sgi *meta2.ShardGroupInfo) []int
	NewDownSamplePolicy(database, name string, info *meta2.DownSamplePolicyInfo) error
	DropDownSamplePolicy(database, name string, dropAll bool) error
	ShowDownSamplePolicies(database string) (models.Rows, error)
	GetMstInfoWithInRp(dbName, rpName string, dataTypes []int64) (*meta2.RpMeasurementsFieldsInfo, error)
	AdminUserExists() bool
	Authenticate(username, password string) (u meta2.User, e error)
	UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error
	OpenAtStore()
	UpdateStreamMstSchema(database string, retentionPolicy string, mst string, stmt *influxql.SelectStatement) error
	CreateStreamPolicy(info *meta2.StreamInfo) error
	GetStreamInfos() map[string]*meta2.StreamInfo
	GetStreamInfosStore() map[string]*meta2.StreamInfo
	ShowStreams(database string, showAll bool) (models.Rows, error)
	DropStream(name string) error
	GetMeasurementInfoStore(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error)
	GetMeasurementsInfoStore(dbName string, rpName string) (*meta2.MeasurementsInfo, error)
	GetAllMst(dbName string) []string
	RetryRegisterQueryIDOffset(host string) (uint64, error)
	ThermalShards(db string, start, end time.Duration) map[uint64]struct{}
	GetNodePtsMap(database string) (map[uint64][]uint32, error)
	DBRepGroups(database string) []meta2.ReplicaGroup
	GetReplicaN(database string) (int, error)

	// for continuous query
	SendSql2MetaHeartbeat(host string) error
	CreateContinuousQuery(database, name, query string) error
	ShowContinuousQueries() (models.Rows, error)
	DropContinuousQuery(name string, database string) error
	UpdateShardInfoTier(shardID uint64, tier uint64, dbName, rpName string) error

	// sysctrl for admin
	SendSysCtrlToMeta(mod string, param map[string]string) (map[string]string, error)
}

type LoadCtx struct {
	LoadCh    chan *DBPTCtx
	ReportCtx sync.Pool
}

func (ctx *LoadCtx) GetReportCtx() *DBPTCtx {
	v := ctx.ReportCtx.Get()
	if v == nil {
		return &DBPTCtx{}
	}
	return v.(*DBPTCtx)
}

func (ctx *LoadCtx) PutReportCtx(dbPTCtx *DBPTCtx) {
	dbPTCtx.DBPTStat.DB = proto.String("")
	dbPTCtx.DBPTStat.PtID = proto.Uint32(0)
	dbPTCtx.putRpStat(&dbPTCtx.DBPTStat.RpStats)
	ctx.ReportCtx.Put(dbPTCtx)
}

type DBPTCtx struct {
	DBPTStat     *proto2.DBPtStatus
	RpStatusPool sync.Pool
}

func (r *DBPTCtx) GetDBPTStat() *proto2.DBPtStatus {
	if r.DBPTStat == nil {
		r.DBPTStat = &proto2.DBPtStatus{}
	}
	return r.DBPTStat
}

func (r *DBPTCtx) GetRpStat() []*proto2.RpShardStatus {
	v := r.RpStatusPool.Get()
	if v == nil {
		return []*proto2.RpShardStatus{}
	}
	return *(v.(*[]*proto2.RpShardStatus))
}

func (r *DBPTCtx) putRpStat(rss *[]*proto2.RpShardStatus) {
	for i := range *rss {
		(*rss)[i].RpName = proto.String("")
		(*rss)[i].ShardStats.ShardID = proto.Uint64(0)
		(*rss)[i].ShardStats.ShardSize = proto.Uint64(0)
		(*rss)[i].ShardStats.SeriesCount = proto.Int32(0)
		(*rss)[i].ShardStats.MaxTime = proto.Int64(0)
	}
	*rss = (*rss)[:0]
	r.RpStatusPool.Put(rss)
}

func (r *DBPTCtx) String() string {
	if r.DBPTStat == nil {
		return ""
	}

	return r.DBPTStat.String()
}

type SendRPCMessage interface {
	SendRPCMsg(currentServer int, msg *message.MetaMessage, callback transport.Callback) error
}

type RPCMessageSender struct{}

func (s *RPCMessageSender) SendRPCMsg(currentServer int, msg *message.MetaMessage, callback transport.Callback) error {
	trans, err := transport.NewMetaTransport(uint64(currentServer), spdy.MetaRequest, callback)
	if err != nil {
		return err
	}
	trans.SetTimeout(RPCReqTimeout)
	if err = trans.Send(msg); err != nil {
		return err
	}
	if err = trans.Wait(); err != nil {
		return err
	}
	refreshConnectedServer(currentServer)
	return nil
}

// Client is used to execute commands on and read data from
// a meta service cluster.
type Client struct {
	tls            bool
	logger         *logger.Logger
	nodeID         uint64
	ShardDurations map[uint64]*meta2.ShardDurationInfo
	DBBriefInfos   map[string]*meta2.DatabaseBriefInfo

	mu          sync.RWMutex
	metaServers []string
	closing     chan struct{}
	changed     chan chan struct{}
	cacheData   *meta2.Data

	// Authentication cache.
	authCache map[string]authUser

	weakPwdPath string

	retentionAutoCreate bool

	ShardTier uint64

	// auth fail lock user
	arChan       chan *authRcd
	muAuthData   sync.RWMutex
	authFailRcds map[string]authFailCache
	authSuccRcds map[string]time.Time
	// select hash ver
	optAlgoVer int

	replicaInfoManager *ReplicaInfoManager

	// send RPC message interface.
	SendRPCMessage
}

type authRcd struct {
	user      string
	result    bool // auth result(succ or fail)
	occurTime time.Time
}

type authFailCache struct {
	user         string
	occurTimeLst []time.Time // auth time list
}

type authUser struct {
	bhash string
	salt  []byte
	hash  []byte
}

// NewClient returns a new *Client.
func NewClient(weakPwdPath string, retentionAutoCreate bool, maxConcurrentWriteLimit int) *Client {
	cli := &Client{
		cacheData:           &meta2.Data{},
		closing:             make(chan struct{}),
		changed:             make(chan chan struct{}, maxConcurrentWriteLimit),
		authCache:           make(map[string]authUser),
		weakPwdPath:         weakPwdPath,
		retentionAutoCreate: retentionAutoCreate,
		logger:              logger.NewLogger(errno.ModuleMetaClient).With(zap.String("service", "metaclient")),
		arChan:              make(chan *authRcd, authFailCacheLimit),
		authFailRcds:        make(map[string]authFailCache),
		authSuccRcds:        make(map[string]time.Time),
		replicaInfoManager:  NewReplicaInfoManager(),
		SendRPCMessage:      &RPCMessageSender{},
	}
	cliOnce.Do(func() {
		DefaultMetaClient = cli
	})

	return cli
}

func cvtDataForAlgoVer(ver string) int {
	var rst int
	switch ver {
	case "ver01":
		rst = algoVer01
	case "ver02":
		rst = algoVer02
	case "ver03":
		rst = algoVer03
	default:
		rst = algoVer02
	}
	return rst
}

func (c *Client) SetHashAlgo(optHashAlgo string) {
	c.optAlgoVer = cvtDataForAlgoVer(optHashAlgo)
}

// Open a connection to a meta service cluster.
func (c *Client) Open() error {
	c.cacheData = c.retryUntilSnapshot(SQL, 0)
	go c.pollForUpdates(SQL)

	go c.updateAuthCacheData()
	return nil
}

func (c *Client) OpenAtStore() {
	c.cacheData = c.retryUntilSnapshot(STORE, 0)
	go c.pollForUpdates(STORE)
	go c.verifyDataNodeStatus()
}

// Close the meta service cluster connection.
func (c *Client) Close() error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if t, ok := http.DefaultTransport.(*http.Transport); ok {
		t.CloseIdleConnections()
	}

	select {
	case <-c.closing:
		return nil
	default:
		close(c.closing)
	}

	return nil
}

// NodeID GetNodeID returns the client's node ID.
func (c *Client) NodeID() uint64 { return c.nodeID }

func (c *Client) SetTier(tier string) error {
	c.ShardTier = meta2.StringToTier(strings.ToUpper(tier))
	if c.ShardTier == util.TierBegin {
		return fmt.Errorf("invalid tier %s", tier)
	}
	return nil
}

// SetMetaServers updates the meta servers on the client.
func (c *Client) SetMetaServers(a []string) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.metaServers = a

	for i, server := range a {
		transport.NewMetaNodeManager().Add(uint64(i), server)
	}
}

// SetTLS sets whether the client should use TLS when connecting.
// This function is not safe for concurrent use.
func (c *Client) SetTLS(v bool) { c.tls = v }

// Ping will hit the ping endpoint for the metaservice and return nil if
// it returns 200. If checkAllMetaServers is set to true, it will hit the
// ping endpoint and tell it to verify the health of all metaservers in the
// cluster
func (c *Client) Ping(checkAllMetaServers bool) error {
	all := 0
	if checkAllMetaServers {
		all = 1
	}
	callback := &PingCallback{}
	msg := message.NewMetaMessage(message.PingRequestMessage, &message.PingRequest{All: all})
	err := c.SendRPCMsg(0, msg, callback)
	if err != nil {
		return err
	}
	return fmt.Errorf(string(callback.Leader))
}

// ClusterID returns the ID of the cluster it's connected to.
func (c *Client) ClusterID() uint64 {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.cacheData.ClusterID
}

// DataNode returns a node by id.
func (c *Client) DataNode(id uint64) (*meta2.DataNode, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	for i := range c.cacheData.DataNodes {
		if c.cacheData.DataNodes[i].ID == id {
			return &c.cacheData.DataNodes[i], nil
		}
	}
	return nil, meta2.ErrNodeNotFound
}

func (c *Client) AliveReadNodes() ([]meta2.DataNode, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	var aliveReaders []meta2.DataNode
	var aliveDefault []meta2.DataNode
	for _, n := range c.cacheData.DataNodes {
		if n.Status != serf.StatusAlive {
			continue
		}
		if n.Role == meta2.NodeReader {
			aliveReaders = append(aliveReaders, n)
		} else if n.Role == meta2.NodeDefault {
			aliveDefault = append(aliveDefault, n)
		}
	}

	if len(aliveReaders) == 0 {
		if len(aliveDefault) == 0 {
			return nil, fmt.Errorf("there is no data nodes for querying")
		}
		sort.Sort(meta2.DataNodeInfos(aliveDefault))
		return aliveDefault, nil
	}

	sort.Sort(meta2.DataNodeInfos(aliveReaders))
	return aliveReaders, nil
}

// DataNodes returns the data nodes' info.
func (c *Client) DataNodes() ([]meta2.DataNode, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.DataNodes, nil
}

func (c *Client) GetAllMst(dbName string) []string {
	var mstName []string
	c.mu.RLock()
	defer c.mu.RUnlock()
	if _, ok := c.cacheData.Databases[dbName]; !ok {
		return nil
	}

	for _, rp := range c.cacheData.Databases[dbName].RetentionPolicies {
		for mst := range rp.Measurements {
			mstName = append(mstName, mst)
		}
	}
	return mstName
}

// CreateDataNode will create a new data node in the metastore
func (c *Client) CreateDataNode(writeHost, queryHost, role string) (uint64, uint64, uint64, error) {
	currentServer := connectedServer
	for {
		// exit if we're closed
		select {
		case <-c.closing:
			return 0, 0, 0, meta2.ErrClientClosed
		default:
			// we're still open, continue on
		}
		c.mu.RLock()
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()

		node, err := c.getNode(currentServer, writeHost, queryHost, role)

		if err == nil && node.NodeId > 0 {
			c.nodeID = node.NodeId
			return c.nodeID, node.LTime, node.ConnId, nil
		}

		c.logger.Warn("get node failed", zap.Error(err), zap.String("writeHost", writeHost), zap.String("queryHost", queryHost))
		time.Sleep(errSleep)

		currentServer++
	}
}

// DataNodeByHTTPHost returns the data node with the give http bind address
func (c *Client) DataNodeByHTTPHost(httpAddr string) (*meta2.DataNode, error) {
	nodes, err := c.DataNodes()
	if err != nil {
		return nil, err
	}
	for _, n := range nodes {
		if n.Host == httpAddr {
			return &n, nil
		}
	}

	return nil, meta2.ErrNodeNotFound
}

// DataNodeByTCPHost returns the data node with the give http bind address
func (c *Client) DataNodeByTCPHost(tcpAddr string) (*meta2.DataNode, error) {
	nodes, err := c.DataNodes()
	if err != nil {
		return nil, err
	}
	for _, n := range nodes {
		if n.TCPHost == tcpAddr {
			return &n, nil
		}
	}

	return nil, meta2.ErrNodeNotFound
}

// DeleteDataNode deletes a data node from the cluster.
func (c *Client) DeleteDataNode(id uint64) error {
	cmd := &proto2.DeleteDataNodeCommand{
		ID: proto.Uint64(id),
	}

	return c.retryUntilExec(proto2.Command_DeleteDataNodeCommand, proto2.E_DeleteDataNodeCommand_Command, cmd)
}

// MetaNodes returns the meta nodes' info.
func (c *Client) MetaNodes() ([]meta2.NodeInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.MetaNodes, nil
}

// MetaNodeByAddr returns the meta node's info.
func (c *Client) MetaNodeByAddr(addr string) *meta2.NodeInfo {
	c.mu.RLock()
	defer c.mu.RUnlock()
	for _, n := range c.cacheData.MetaNodes {
		if n.Host == addr {
			return &n
		}
	}
	return nil
}

func (c *Client) FieldKeys(database string, ms influxql.Measurements) (map[string]map[string]int32, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	mis, err := c.matchMeasurements(database, ms)
	if err != nil {
		return nil, err
	}
	if len(mis) == 0 {
		return nil, nil
	}

	ret := make(map[string]map[string]int32, len(mis))
	for _, m := range mis {
		ret[m.OriginName()] = make(map[string]int32)
		m.FieldKeys(ret)
	}
	return ret, nil
}

func (c *Client) TagKeys(database string) map[string]set.Set {
	c.mu.RLock()
	defer c.mu.RUnlock()
	dbi := c.cacheData.Database(database)
	uniqueMap := make(map[string]set.Set)

	dbi.WalkRetentionPolicy(func(rp *meta2.RetentionPolicyInfo) {
		rp.EachMeasurements(func(mst *meta2.MeasurementInfo) {
			s := set.NewSet()
			for key := range mst.Schema {
				if mst.Schema[key] == influx.Field_Type_Tag {
					s.Add(key)
				}
			}
			_, ok := uniqueMap[mst.Name]
			if ok {
				uniqueMap[mst.Name] = uniqueMap[mst.Name].Union(s)
				return
			}
			uniqueMap[mst.Name] = s
		})
	})

	return uniqueMap
}

func (c *Client) GetMeasurements(m *influxql.Measurement) ([]*meta2.MeasurementInfo, error) {
	var measurements []*meta2.MeasurementInfo
	c.mu.RLock()
	defer c.mu.RUnlock()
	dbi, err := c.cacheData.GetDatabase(m.Database)
	if err != nil {
		return nil, err
	}

	rpi, err := dbi.GetRetentionPolicy(m.RetentionPolicy)
	if err != nil {
		return nil, err
	}

	if m.Regex != nil {
		rpi.EachMeasurements(func(msti *meta2.MeasurementInfo) {
			if m.Regex.Val.Match([]byte(influx.GetOriginMstName(msti.Name))) {
				measurements = append(measurements, msti)
			}
		})
		sort.Slice(measurements, func(i, j int) bool {
			return influx.GetOriginMstName(measurements[i].Name) < influx.GetOriginMstName(measurements[j].Name)
		})
	} else {
		msti, err := rpi.GetMeasurement(m.Name)
		if err != nil {
			return nil, err
		}
		measurements = append(measurements, msti)
	}
	return measurements, nil
}

func (c *Client) Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.Measurement(database, rpName, mstName)
}

func (c *Client) RetryGetMeasurementInfoStore(database string, rpName string, mstName string) ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var info []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, errors.New("GetMeasurementInfoStore fail")
		default:

		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		info, err = c.getMeasurementInfo(currentServer, database, rpName, mstName)
		if err == nil {
			break
		}

		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)

		currentServer++
	}
	return info, nil
}

func (c *Client) getMeasurementInfo(currentServer int, database string, rpName string, mstName string) ([]byte, error) {
	callback := &GetMeasurementInfoCallback{}
	msg := message.NewMetaMessage(message.GetMeasurementInfoRequestMessage, &message.GetMeasurementInfoRequest{
		DbName: database, RpName: rpName, MstName: mstName})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		if !strings.Contains(err.Error(), "node is not the leader") {
			c.logger.Error("GetMeasurementInfoR SendRPCMsg fail", zap.Error(err))
		}
	}
	return callback.Data, nil
}

func (c *Client) GetMeasurementInfoStore(dbName string, rpName string, mstName string) (*meta2.MeasurementInfo, error) {
	b, err := c.RetryGetMeasurementInfoStore(dbName, rpName, mstName)
	if err != nil {
		return nil, err
	}
	mst := &meta2.MeasurementInfo{}
	if err = mst.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return mst, nil
}

func (c *Client) GetMeasurementsInfoStore(dbName string, rpName string) (*meta2.MeasurementsInfo, error) {
	b, err := c.RetryGetMeasurementsInfoStore(dbName, rpName)
	if err != nil {
		return nil, err
	}
	mst := &meta2.MeasurementsInfo{}
	if err = mst.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return mst, nil
}

func (c *Client) RetryGetMeasurementsInfoStore(database string, rpName string) ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var info []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, errors.New("GetMeasurementsInfoStore fail")
		default:

		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		info, err = c.getMeasurementsInfo(currentServer, database, rpName)
		if err == nil {
			break
		}

		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)

		currentServer++
	}
	return info, nil
}

func (c *Client) getMeasurementsInfo(currentServer int, database string, rpName string) ([]byte, error) {
	callback := &GetMeasurementsInfoCallback{}
	msg := message.NewMetaMessage(message.GetMeasurementsInfoRequestMessage, &message.GetMeasurementsInfoRequest{
		DbName: database, RpName: rpName})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		c.logger.Error("GetMeasurementInfoR SendRPCMsg fail", zap.Error(err))
		return nil, err
	}
	return callback.Data, nil
}

// Database returns info for the requested database.
func (c *Client) Database(name string) (*meta2.DatabaseInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.GetDatabase(name)
}

// Databases returns a list of all database infos.
func (c *Client) Databases() map[string]*meta2.DatabaseInfo {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.cacheData.Databases
}

func (c *Client) ShowShards() models.Rows {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowShards()
}

func (c *Client) ShowShardGroups() models.Rows {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowShardGroups()
}

func (c *Client) ShowSubscriptions() models.Rows {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowSubscriptions()
}

func (c *Client) ShowRetentionPolicies(database string) (models.Rows, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowRetentionPolicies(database)
}

func (c *Client) ShowCluster() models.Rows {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowCluster()
}

func (c *Client) ShowClusterWithCondition(nodeType string, ID uint64) (models.Rows, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowClusterWithCondition(nodeType, ID)
}

func (c *Client) Schema(database string, retentionPolicy string, mst string) (fields map[string]int32,
	dimensions map[string]struct{}, err error) {
	fields = make(map[string]int32)
	dimensions = make(map[string]struct{})
	msti, err := c.Measurement(database, retentionPolicy, mst)
	if err != nil {
		return nil, nil, err
	}

	for key := range msti.Schema {
		if msti.Schema[key] == influx.Field_Type_Tag {
			dimensions[key] = struct{}{}
		} else {
			fields[key] = msti.Schema[key]
		}
	}
	return fields, dimensions, nil
}

func (c *Client) UpdateSchema(database string, retentionPolicy string, mst string, fieldToCreate []*proto2.FieldSchema) error {
	cmd := &proto2.UpdateSchemaCommand{
		Database:      proto.String(database),
		RpName:        proto.String(retentionPolicy),
		Measurement:   proto.String(mst),
		FieldToCreate: fieldToCreate,
	}

	err := c.retryUntilExec(proto2.Command_UpdateSchemaCommand, proto2.E_UpdateSchemaCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) CreateMeasurement(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo, indexR *influxql.IndexRelation,
	engineType config.EngineType, colStoreInfo *meta2.ColStoreInfo, schemaInfo []*proto2.FieldSchema, options *meta2.Options) (*meta2.MeasurementInfo, error) {
	msti, err := c.Measurement(database, retentionPolicy, mst)
	if msti != nil {
		// check shardkey equal or not
		n := len(msti.ShardKeys)
		if n == 0 || !shardKey.EqualsToAnother(&msti.ShardKeys[n-1]) {
			return nil, meta2.ErrMeasurementExists
		}
		return msti, nil
	}
	if err != meta2.ErrMeasurementNotFound {
		return nil, err
	}

	if !meta2.ValidMeasurementName(mst) {
		return nil, errno.NewError(errno.InvalidMeasurement, mst)
	}

	cmd := &proto2.CreateMeasurementCommand{
		DBName:     proto.String(database),
		RpName:     proto.String(retentionPolicy),
		Name:       proto.String(mst),
		EngineType: proto.Uint32(uint32(engineType)),
	}

	if shardKey != nil {
		cmd.Ski = shardKey.Marshal()
	}

	if indexR != nil {
		if msti == nil {
			indexR.Rid = 0
		} else {
			indexR.Rid = 1
		}
		cmd.IR = meta2.EncodeIndexRelation(indexR)
	}

	if colStoreInfo != nil {
		cmd.ColStoreInfo = colStoreInfo.Marshal()
	}

	if len(schemaInfo) > 0 {
		cmd.SchemaInfo = schemaInfo
	}

	if options != nil {
		cmd.Options = options.Marshal()
	}

	err = c.retryUntilExec(proto2.Command_CreateMeasurementCommand, proto2.E_CreateMeasurementCommand_Command, cmd)
	if err != nil {
		return nil, err
	}
	return c.Measurement(database, retentionPolicy, mst)
}

func (c *Client) AlterShardKey(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo) error {
	_, err := c.Measurement(database, retentionPolicy, mst)
	if err != nil {
		return err
	}

	cmd := &proto2.AlterShardKeyCmd{
		DBName: proto.String(database),
		RpName: proto.String(retentionPolicy),
		Name:   proto.String(mst),
		Ski:    shardKey.Marshal(),
	}

	return c.retryUntilExec(proto2.Command_AlterShardKeyCmd, proto2.E_AlterShardKeyCmd_Command, cmd)
}

// CreateDatabase creates a database or returns it if it already exists.
func (c *Client) CreateDatabase(name string, enableTagArray bool, replicaN uint32, options *obs.ObsOptions) (*meta2.DatabaseInfo, error) {
	if strings.Count(name, "") > maxDbOrRpName {
		return nil, ErrNameTooLong
	}

	var err error
	replicaN, _, err = checkAndUpdateReplication(replicaN, nil)
	if err != nil {
		return nil, err
	}

	db, err := c.Database(name)
	if db != nil || !errno.Equal(err, errno.DatabaseNotFound) {
		return db, err
	}

	cmd := &proto2.CreateDatabaseCommand{
		Name:           proto.String(name),
		EnableTagArray: proto.Bool(enableTagArray),
		ReplicaNum:     proto.Uint32(replicaN),
	}

	if options != nil {
		cmd.Options = meta2.MarshalObsOptions(options)
	}

	err = c.retryUntilExec(proto2.Command_CreateDatabaseCommand, proto2.E_CreateDatabaseCommand_Command, cmd)
	if err != nil {
		return nil, err
	}

	return c.Database(name)
}

func checkAndUpdateReplication(dbReplicaN uint32, rpReplicaN *int) (uint32, *int, error) {
	oneReplication := 1
	if dbReplicaN == 0 && rpReplicaN == nil {
		// Default number of replication: 1
		dbReplicaN = 1
		rpReplicaN = &oneReplication
	} else if dbReplicaN == 0 && rpReplicaN != nil {
		dbReplicaN = uint32(*rpReplicaN)
	} else if dbReplicaN != 0 && rpReplicaN == nil {
		rpReplicaN = &oneReplication
		*rpReplicaN = int(dbReplicaN)
	}

	if dbReplicaN != uint32(*rpReplicaN) {
		return dbReplicaN, rpReplicaN, errno.NewError(errno.ReplicaNumberNotEqual)
	}
	if dbReplicaN > 2 {
		return dbReplicaN, rpReplicaN, errno.NewError(errno.ReplicaNumberNotSupport)
	}
	if dbReplicaN > 1 && config.GetHaPolicy() != config.Replication {
		return dbReplicaN, rpReplicaN, errno.NewError(errno.ConflictWithRep)
	}
	return dbReplicaN, rpReplicaN, nil
}

// CreateDatabaseWithRetentionPolicy creates a database with the specified
// retention policy.
//
// When creating a database with a retention policy, the retention policy will
// always be set to default. Therefore if the caller provides a retention policy
// that already exists on the database, but that retention policy is not the
// default one, an error will be returned.
//
// This call is only idempotent when the caller provides the exact same
// retention policy, and that retention policy is already the default for the
// database.
func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *meta2.RetentionPolicySpec, shardKey *meta2.ShardKeyInfo,
	enableTagArray bool, replicaN uint32) (*meta2.DatabaseInfo, error) {
	if spec == nil {
		return nil, errors.New("CreateDatabaseWithRetentionPolicy called with nil spec")
	}

	var err error
	replicaN, spec.ReplicaN, err = checkAndUpdateReplication(replicaN, spec.ReplicaN)
	if err != nil {
		return nil, err
	}

	rpi := spec.NewRetentionPolicyInfo()
	if err := rpi.CheckSpecValid(); err != nil {
		return nil, err
	}

	db, err := c.Database(name)
	if err != nil && !errno.Equal(err, errno.DatabaseNotFound) {
		return nil, err
	}

	if db != nil {
		if !db.ShardKey.EqualsToAnother(shardKey) {
			return nil, errno.NewError(errno.ShardKeyConflict)
		}
		if rp := db.RetentionPolicy(rpi.Name); rp != nil {
			if !rp.EqualsAnotherRp(rpi) {
				return nil, meta2.ErrRetentionPolicyConflict
			}
			return db, nil
		}
	}

	cmd := &proto2.CreateDatabaseCommand{
		Name:            proto.String(name),
		RetentionPolicy: rpi.Marshal(),
		EnableTagArray:  proto.Bool(enableTagArray),
		ReplicaNum:      proto.Uint32(replicaN),
	}

	if len(shardKey.ShardKey) > 0 {
		cmd.Ski = shardKey.Marshal()
	}

	err = c.retryUntilExec(proto2.Command_CreateDatabaseCommand, proto2.E_CreateDatabaseCommand_Command, cmd)
	if err != nil {
		return nil, err
	}

	return c.Database(name)
}

func (c *Client) MarkMeasurementDelete(database, measurement string) error {
	dbi, err := c.Database(database)
	if err != nil {
		return err
	}
	if dbi == nil {
		return nil
	}

	var policy string
	for _, rp := range dbi.RetentionPolicies {
		msti := rp.Measurement(measurement)
		if msti == nil {
			continue
		}
		if msti.MarkDeleted {
			return nil
		}
		policy = rp.Name
		break
	}
	cmd := &proto2.MarkMeasurementDeleteCommand{
		Database:    proto.String(database),
		Policy:      proto.String(policy),
		Measurement: proto.String(measurement),
	}

	return c.retryUntilExec(proto2.Command_MarkMeasurementDeleteCommand, proto2.E_MarkMeasurementDeleteCommand_Command, cmd)
}

func (c *Client) MarkDatabaseDelete(name string) error {
	cmd := &proto2.MarkDatabaseDeleteCommand{
		Name: proto.String(name),
	}

	return c.retryUntilExec(proto2.Command_MarkDatabaseDeleteCommand, proto2.E_MarkDatabaseDeleteCommand_Command, cmd)
}

func (c *Client) MarkRetentionPolicyDelete(database, name string) error {
	cmd := &proto2.MarkRetentionPolicyDeleteCommand{
		Database: proto.String(database),
		Name:     proto.String(name),
	}

	return c.retryUntilExec(proto2.Command_MarkRetentionPolicyDeleteCommand, proto2.E_MarkRetentionPolicyDeleteCommand_Command, cmd)
}

// CreateRetentionPolicy creates a retention policy on the specified database.
func (c *Client) CreateRetentionPolicy(database string, spec *meta2.RetentionPolicySpec, makeDefault bool) (*meta2.RetentionPolicyInfo, error) {

	if spec.Duration != nil && *spec.Duration < meta2.MinRetentionPolicyDuration && *spec.Duration != 0 {
		return nil, meta2.ErrRetentionPolicyDurationTooLow
	}

	rpi := spec.NewRetentionPolicyInfo()
	if strings.Count(rpi.Name, "") > maxDbOrRpName {
		return nil, ErrNameTooLong
	}
	cmd := &proto2.CreateRetentionPolicyCommand{
		Database:        proto.String(database),
		RetentionPolicy: rpi.Marshal(),
		DefaultRP:       proto.Bool(makeDefault),
	}

	if err := c.retryUntilExec(proto2.Command_CreateRetentionPolicyCommand, proto2.E_CreateRetentionPolicyCommand_Command, cmd); err != nil {
		return nil, err
	}

	return c.RetentionPolicy(database, rpi.Name)
}

// RetentionPolicy returns the requested retention policy info.
func (c *Client) RetentionPolicy(database, name string) (rpi *meta2.RetentionPolicyInfo, err error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	db := c.cacheData.Database(database)
	if db == nil || db.MarkDeleted {
		return nil, errno.NewError(errno.DatabaseNotFound, database)
	}

	return db.RetentionPolicy(name), nil
}

func (c *Client) GetNodePtsMap(database string) (map[uint64][]uint32, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	if c.cacheData == nil || len(c.cacheData.PtView[database]) == 0 {
		return nil, errno.NewError(errno.DatabaseNotFound, database)
	}
	nodePtMap := make(map[uint64][]uint32, len(c.cacheData.DataNodes))
	repGroups := c.cacheData.DBRepGroups(database)
	ptInfo := c.cacheData.DBPtView(database)
	repGroupN := uint32(len(repGroups))
	for i := range ptInfo {
		if config.GetHaPolicy() == config.WriteAvailableFirst && c.cacheData.PtView[database][i].Status == meta2.Offline {
			continue
		}
		if config.GetHaPolicy() == config.Replication && ptInfo[i].RGID < repGroupN && !repGroups[ptInfo[i].RGID].IsMasterPt(ptInfo[i].PtId) {
			continue
		}
		nodePtMap[ptInfo[i].Owner.NodeID] = append(nodePtMap[ptInfo[i].Owner.NodeID], ptInfo[i].PtId)
	}
	return nodePtMap, nil
}

func (c *Client) DBPtView(database string) (meta2.DBPtInfos, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	pts := c.cacheData.DBPtView(database)
	if pts == nil {
		return nil, errno.NewError(errno.DatabaseNotFound, database)
	}

	return pts, nil
}

func (c *Client) DBRepGroups(database string) []meta2.ReplicaGroup {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.cacheData.DBRepGroups(database)
}

func (c *Client) GetReplicaN(database string) (int, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	db := c.cacheData.Database(database)
	if db == nil {
		return 0, errno.NewError(errno.DatabaseNotFound, database)
	}
	return db.ReplicaN, nil
}

// SetDefaultRetentionPolicy sets a database's default retention policy.
func (c *Client) SetDefaultRetentionPolicy(database, name string) error {
	cmd := &proto2.SetDefaultRetentionPolicyCommand{
		Database: proto.String(database),
		Name:     proto.String(name),
	}

	return c.retryUntilExec(proto2.Command_SetDefaultRetentionPolicyCommand, proto2.E_SetDefaultRetentionPolicyCommand_Command, cmd)
}

// UpdateRetentionPolicy updates a retention policy.
func (c *Client) UpdateRetentionPolicy(database, name string, rpu *meta2.RetentionPolicyUpdate, makeDefault bool) error {
	var newName *string
	if rpu.Name != nil {
		newName = rpu.Name
	}

	var replicaN *uint32
	if rpu.ReplicaN != nil {
		value := uint32(*rpu.ReplicaN)
		replicaN = &value
	}

	cmd := &proto2.UpdateRetentionPolicyCommand{
		Database:           proto.String(database),
		Name:               proto.String(name),
		NewName:            newName,
		Duration:           meta2.GetInt64Duration(rpu.Duration),
		ReplicaN:           replicaN,
		ShardGroupDuration: meta2.GetInt64Duration(rpu.ShardGroupDuration),
		MakeDefault:        proto.Bool(makeDefault),
		HotDuration:        meta2.GetInt64Duration(rpu.HotDuration),
		WarmDuration:       meta2.GetInt64Duration(rpu.WarmDuration),
		IndexGroupDuration: meta2.GetInt64Duration(rpu.IndexGroupDuration),
	}

	return c.retryUntilExec(proto2.Command_UpdateRetentionPolicyCommand, proto2.E_UpdateRetentionPolicyCommand_Command, cmd)
}

// IsLeader - should get rid of this
func (c *Client) IsLeader() bool {
	return false
}

// Users returns a slice of UserInfo representing the currently known users.
func (c *Client) Users() []meta2.UserInfo {
	c.mu.RLock()
	defer c.mu.RUnlock()

	users := c.cacheData.Users

	if users == nil {
		return []meta2.UserInfo{}
	}
	return users
}

// User returns the user with the given name, or ErrUserNotFound.
func (c *Client) User(name string) (meta2.User, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	for _, u := range c.cacheData.Users {
		if u.Name == name {
			return &u, nil
		}
	}

	return nil, meta2.ErrUserNotFound
}

// Encrypt the plaintext by different hash version with giving salt
func (c *Client) encryptWithSalt(salt []byte, plaintext string) []byte {
	switch c.optAlgoVer {
	case algoVer01:
		return c.hashWithSalt(salt, plaintext)
	case algoVer02:
		return c.pbkdf2WithSalt(salt, plaintext, algoVer02)
	case algoVer03:
		return c.pbkdf2WithSalt(salt, plaintext, algoVer03)
	default:
		return nil
	}
}

// Encrypt the plaintext by different hash version
func (c *Client) saltedEncryptByVer(plaintext string) (salt, hash []byte, err error) {
	switch c.optAlgoVer {
	case algoVer01:
		return c.saltedHash(plaintext)
	case algoVer02:
		return c.saltedPbkdf2(plaintext, algoVer02)
	case algoVer03:
		return c.saltedPbkdf2(plaintext, algoVer03)
	default:
		return nil, nil, meta2.ErrUnsupportedVer
	}
}

// hashWithSalt returns a salted hash of password using salt.
func (c *Client) hashWithSalt(salt []byte, password string) []byte {
	hasher := sha256.New()
	_, err := hasher.Write(salt)
	if err != nil {
		return nil
	}
	_, err = hasher.Write([]byte(password))
	if err != nil {
		return nil
	}
	return hasher.Sum(nil)
}

// saltedHash returns a salt and salted hash of password.
func (c *Client) saltedHash(password string) (salt, hash []byte, err error) {
	salt = make([]byte, SaltBytes)
	if _, err := io.ReadFull(crand.Reader, salt); err != nil {
		return nil, nil, err
	}

	return salt, c.hashWithSalt(salt, password), nil
}

func (c *Client) genHashPwdVal(password string) (string, error) {
	switch c.optAlgoVer {
	case algoVer01:
		return c.genSHA256PwdVal(password)
	default:
		return c.genPbkdf2PwdVal(password)
	}
}

// generates the salted hash value of the password (using SHA256).
func (c *Client) genSHA256PwdVal(password string) (string, error) {
	// 1.generate a salt and hash of the password for the cache
	salt, hashed, err := c.saltedHash(password)
	if err != nil {
		c.logger.Error("saltedHash fail", zap.Error(err))
		return "", err
	}

	// 2.assemble, verFlag + salt + hashedVal
	rstVal := hashAlgoVerOne
	rstVal += fmt.Sprintf("%02X", salt)   // convert to  hex string
	rstVal += fmt.Sprintf("%02X", hashed) // convert to hex string
	return rstVal, nil
}

// pbkdf2WithSalt returns an encryption of password using salt.
func (c *Client) pbkdf2WithSalt(salt []byte, password string, algoVer int) []byte {
	pbkdf2Iter := pbkdf2Iter4096
	if algoVer == algoVer03 {
		pbkdf2Iter = pbkdf2Iter1000
	}
	dk := pbkdf2.Key([]byte(password), salt, pbkdf2Iter, pbkdf2KeyLen, sha256.New)
	return dk
}

// saltedPbkdf2 returns a salt and used pbkdf2 of password.
func (c *Client) saltedPbkdf2(password string, algoVer int) (salt, hash []byte, err error) {
	salt = make([]byte, SaltBytes)
	if _, err := io.ReadFull(crand.Reader, salt); err != nil {
		return nil, nil, err
	}

	return salt, c.pbkdf2WithSalt(salt, password, algoVer), nil
}

// generates the salted hash value of the password (using PBKDF2)
func (c *Client) genPbkdf2PwdVal(password string) (string, error) {
	// 1.generate a salt and hash of the password for the cache
	salt, hashed, err := c.saltedPbkdf2(password, c.optAlgoVer)
	if err != nil {
		c.logger.Error("saltedHash fail", zap.Error(err))
		return "", err
	}

	// 2.assemble (verFlag + salt + hashedVal(PBKDF2))
	var rstVal string
	switch c.optAlgoVer {
	case algoVer02:
		rstVal = hashAlgoVerTwo
	case algoVer03:
		rstVal = hashAlgoVerThree
	default:
		rstVal = hashAlgoVerTwo
	}
	rstVal += fmt.Sprintf("%02X", salt)   // convert to  hex string
	rstVal += fmt.Sprintf("%02X", hashed) // convert to hex string
	return rstVal, nil
}

// for hash ver One
func (c *Client) compareHashAndPwdVerOne(hashed, plaintext string) error {
	if len(hashed) < len(hashAlgoVerOne)+SaltBytes*2 {
		return meta2.ErrHashedLength
	}
	verFlagLen := len(hashAlgoVerOne)
	saltStr := hashed[verFlagLen : verFlagLen+SaltBytes*2]
	hashStr := hashed[verFlagLen+SaltBytes*2:]

	salt, err := func(s string) ([]byte, error) {
		rstSlice := make([]byte, len(s)/2)
		for i := 0; i+1 < len(s); {
			uVal, err := strconv.ParseUint(s[i:i+2], 16, 8) //16 base, 8 bitSize
			if err != nil {
				c.logger.Error("hash pwd VerOne convert str to uint fail", zap.Error(err))
				return nil, err
			}
			rstSlice[i/2] = byte(uVal & 0xFF)
			i += 2
		}
		return rstSlice, nil
	}(saltStr)

	if err != nil {
		return err
	}

	// gen hasded strVal from given plain pwd
	newHashStr := fmt.Sprintf("%02X", c.hashWithSalt(salt, plaintext))

	if hashStr != newHashStr {
		return meta2.ErrMismatchedHashAndPwd
	}
	return nil
}

// for hash ver Two
func (c *Client) compareHashAndPwdVerTwo(hashed, plaintext string, algoVer int) error {
	if len(hashed) < len(hashAlgoVerTwo)+SaltBytes*2 {
		return meta2.ErrHashedLength
	}
	verFlagLen := len(hashAlgoVerTwo)
	saltStr := hashed[verFlagLen : verFlagLen+SaltBytes*2]
	hashStr := hashed[verFlagLen+SaltBytes*2:]

	salt, err := func(s string) ([]byte, error) {
		rstSlice := make([]byte, len(s)/2)
		for i := 0; i+1 < len(s); {
			uVal, err := strconv.ParseUint(s[i:i+2], 16, 8) //16 base, 8 bitSize
			if err != nil {
				c.logger.Error("hash pwd VerTwo convert str to uint fail", zap.Error(err))
				return nil, err
			}
			rstSlice[i/2] = byte(uVal & 0xFF)
			i += 2
		}
		return rstSlice, nil
	}(saltStr)

	if err != nil {
		return err
	}

	// gen pbkdf2 hashed strVal from given plain pwd
	dk := c.pbkdf2WithSalt(salt, plaintext, algoVer)
	newHashStr := fmt.Sprintf("%02X", dk)

	if hashStr != newHashStr {
		return meta2.ErrMismatchedHashAndPwd
	}
	return nil
}

// compares a hashed password with its possible
// plaintext equivalent. Returns nil on success, or an error on failure.
func (c *Client) CompareHashAndPlainPwd(hashed, plaintext string) error {
	if len(hashed) < len(hashAlgoVerOne) {
		return meta2.ErrHashedLength
	}

	hashVer := hashed[:len(hashAlgoVerOne)]
	switch hashVer {
	case hashAlgoVerOne:
		return c.compareHashAndPwdVerOne(hashed, plaintext)
	case hashAlgoVerTwo:
		return c.compareHashAndPwdVerTwo(hashed, plaintext, algoVer02)
	case hashAlgoVerThree:
		return c.compareHashAndPwdVerTwo(hashed, plaintext, algoVer03)

	default:
		return meta2.ErrMismatchedHashAndPwd
	}
}

// CreateUser adds a user with the given name and password and admin status.
func (c *Client) CreateUser(name, password string, admin, rwuser bool) (meta2.User, error) {
	// verify name length
	if err := c.isValidName(name); err != nil {
		return nil, err
	}
	// verify password
	if err := c.isValidPwd(password, name); err != nil {
		return nil, err
	}

	data := c.cacheData.Clone()

	// See if the user already exists.
	if u := data.GetUser(name); u != nil {
		if err := c.CompareHashAndPlainPwd(u.Hash, password); err != nil || u.Admin != admin {
			return nil, meta2.ErrUserExists
		}
		return u, nil
	}

	// Forbidden create multi admin user
	if admin && data.HasAdminUser() {
		return nil, meta2.ErrUserForbidden
	}

	// Hash the password before serializing it.
	hash, err := c.genHashPwdVal(password)
	if err != nil {
		return nil, err
	}

	if err := c.retryUntilExec(proto2.Command_CreateUserCommand, proto2.E_CreateUserCommand_Command,
		&proto2.CreateUserCommand{
			Name:   proto.String(name),
			Hash:   proto.String(hash),
			Admin:  proto.Bool(admin),
			RwUser: proto.Bool(rwuser),
		},
	); err != nil {
		return nil, err
	}
	return c.User(name)
}

// UpdateUser updates the password of an existing user.
func (c *Client) UpdateUser(name, password string) error {
	// verify password
	if err := c.isValidPwd(password, name); err != nil {
		return err
	}

	// Hash the password before serializing it.
	hash, err := c.genHashPwdVal(password)
	if err != nil {
		return err
	}

	if u := c.cacheData.GetUser(name); u == nil {
		return meta2.ErrUserNotFound
	} else {
		if err = c.CompareHashAndPlainPwd(u.Hash, password); err == nil {
			return meta2.ErrPwdUsed
		}
	}
	return c.retryUntilExec(proto2.Command_UpdateUserCommand, proto2.E_UpdateUserCommand_Command,
		&proto2.UpdateUserCommand{
			Name: proto.String(name),
			Hash: proto.String(hash),
		},
	)
}

func (c *Client) isValidName(user string) error {
	if len(user) < minUsernameLen || len(user) > maxUsernameLen {
		return errno.NewError(errno.InvalidUsernameLen, minUsernameLen, maxUsernameLen)
	}
	return nil
}

func (c *Client) isValidPwd(s, user string) error {
	if len(s) < minPasswordLen || len(s) > maxPasswordLen {
		return errno.NewError(errno.InvalidPwdLen, minPasswordLen, maxPasswordLen)
	}

	if ok, _ := c.isInWeakPwdDict(s); ok {
		return errno.NewError(errno.InvalidPwdComplex)
	}

	if c.isSimilarToUserName(s, user) {
		return errno.NewError(errno.InvalidPwdLooks)
	}

	if !c.isStrengthPwd(s) {
		return errno.NewError(errno.InvalidPwdComplex)
	}

	return nil
}

func (c *Client) isSimilarToUserName(s, user string) bool {
	if len(s) != len(user) {
		return false
	}

	//pwd same as user name
	if s == user {
		return true
	}

	//pwd same as reverse(user name)
	slen := len(s)
	for i := 0; i < slen; i++ {
		if s[i] != user[slen-1-i] {
			return false
		}
	}
	return true
}

func (c *Client) isInWeakPwdDict(s string) (bool, error) {
	filename := c.weakPwdPath
	content, err := os.ReadFile(path.Clean(filename))
	if err != nil {
		return false, err
	}
	dec := unicode.BOMOverride(transform.Nop)
	content, _, err = transform.Bytes(dec, content)
	if err != nil {
		return false, err
	}
	weakPwdDict := strings.Split(string(content), "\r\n")
	for _, v := range weakPwdDict {
		if v == s {
			return true, nil
		}
	}
	return false, nil
}

func (c *Client) isStrengthPwd(s string) bool {
	var specChars = "-~@$#%_^!*+=?"
	var lowerCh, upperCh, digitCh, specCh bool
	for i := 0; i < len(s); i++ {
		ch := s[i]
		switch {
		case 'a' <= ch && ch <= 'z':
			lowerCh = true
		case 'A' <= ch && ch <= 'Z':
			upperCh = true
		case '0' <= ch && ch <= '9':
			digitCh = true
		case strings.Contains(specChars, string(ch)):
			specCh = true
		default:
			c.logger.Info("verify pwd strength:", zap.Any("unsupported char", string(ch)))
			return false
		}
	}
	c.logger.Info("verify pwd strength:", zap.Any("lower", lowerCh), zap.Any("upper", upperCh),
		zap.Any("digit", digitCh), zap.Any("spec", specCh))
	return lowerCh && upperCh && digitCh && specCh
}

// DropUser removes the user with the given name.
func (c *Client) DropUser(name string) error {
	if u, err := c.User(name); err != nil {
		return err
	} else {
		if u.AuthorizeUnrestricted() {
			return meta2.ErrUserDropSelf
		}
	}
	return c.retryUntilExec(proto2.Command_DropUserCommand, proto2.E_DropUserCommand_Command,
		&proto2.DropUserCommand{
			Name: proto.String(name),
		},
	)
}

// SetPrivilege sets a privilege for the given user on the given database.
func (c *Client) SetPrivilege(username, database string, p originql.Privilege) error {
	return c.retryUntilExec(proto2.Command_SetPrivilegeCommand, proto2.E_SetPrivilegeCommand_Command,
		&proto2.SetPrivilegeCommand{
			Username:  proto.String(username),
			Database:  proto.String(database),
			Privilege: proto.Int32(int32(p)),
		},
	)
}

// SetAdminPrivilege sets or unsets admin privilege to the given username.
func (c *Client) SetAdminPrivilege(username string, admin bool) error {
	return c.retryUntilExec(proto2.Command_SetAdminPrivilegeCommand, proto2.E_SetAdminPrivilegeCommand_Command,
		&proto2.SetAdminPrivilegeCommand{
			Username: proto.String(username),
			Admin:    proto.Bool(admin),
		},
	)
}

// UserPrivileges returns the privileges for a user mapped by database name.
func (c *Client) UserPrivileges(username string) (map[string]originql.Privilege, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	p, err := c.cacheData.UserPrivileges(username)
	if err != nil {
		return nil, err
	}
	return p, nil
}

// UserPrivilege returns the privilege for the given user on the given database.
func (c *Client) UserPrivilege(username, database string) (*originql.Privilege, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	p, err := c.cacheData.UserPrivilege(username, database)
	if err != nil {
		return nil, err
	}
	return p, nil
}

// AdminUserExists returns true if any user has admin privilege.
func (c *Client) AdminUserExists() bool {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.AdminUserExist()
}

func (c *Client) updateAuthCacheData() {
	ticker := time.NewTicker(1 * time.Hour)
	defer ticker.Stop()
	lockTicker := time.NewTicker(500 * time.Millisecond)
	defer lockTicker.Stop()

	for {
		select {
		case rcd := <-c.arChan:
			c.dealOnceAuthRecord(rcd)
		case <-lockTicker.C:
			c.updateUserLockData()
		case <-ticker.C:
			c.clearInvalidAuthData()
		case <-c.closing:
			return
		}
	}
}

func (c *Client) updateUserLockData() {
	c.muAuthData.Lock()
	defer c.muAuthData.Unlock()

	// clear expired locks (unlock users)
	var unlockLst []string
	for k, v := range c.authFailRcds {
		if len(v.occurTimeLst) >= maxLoginLimit {
			if time.Since(v.occurTimeLst[maxLoginLimit-1]).Seconds() >= lockUserTime {
				unlockLst = append(unlockLst, k)
			}
		}
	}

	for _, u := range unlockLst {
		c.logger.Info("unlock user", zap.String("user", u))
		delete(c.authFailRcds, u)
	}
}

func (c *Client) dealOncAuthSuccRcd(r *authRcd) {
	if !r.result {
		return
	}

	// discard invalid authSuccRcd
	if v, ok := c.authSuccRcds[r.user]; ok {
		if v.After(r.occurTime) {
			return
		}
	}
	// record the latest successful authentication
	c.authSuccRcds[r.user] = r.occurTime

	// refresh authFailRcds, clear invalid auth fail rcd.
	if v, ok := c.authFailRcds[r.user]; ok {
		var tmp []time.Time
		existInvalidRcd := false
		for _, tv := range v.occurTimeLst {
			if r.occurTime.After(tv) {
				existInvalidRcd = true
				continue
			}
			tmp = append(tmp, tv)
		}
		if !existInvalidRcd {
			return
		}
		if len(tmp) == 0 {
			delete(c.authFailRcds, r.user)
			return
		}
		v.occurTimeLst = tmp
		c.authFailRcds[r.user] = v
	}
}

func (c *Client) dealOncAuthFailRcd(r *authRcd) {
	if r.result {
		return
	}

	// discard invalid authFailRcd
	if v, ok := c.authSuccRcds[r.user]; ok {
		if v.After(r.occurTime) {
			return
		}
	}

	// refresh auth_Fail_Cache data
	if v, ok := c.authFailRcds[r.user]; ok {
		v.occurTimeLst = append(v.occurTimeLst, r.occurTime)
		sort.Slice(v.occurTimeLst, func(i, j int) bool {
			return v.occurTimeLst[j].After(v.occurTimeLst[i])
		})
		if len(v.occurTimeLst) > maxLoginLimit {
			v.occurTimeLst = v.occurTimeLst[0:maxLoginLimit]
		}
		c.authFailRcds[r.user] = v
	} else {
		c.authFailRcds[r.user] = authFailCache{
			user: r.user, occurTimeLst: []time.Time{r.occurTime}}
	}
	c.logger.Info(r.user, zap.Any("auth fail count", len(c.authFailRcds[r.user].occurTimeLst)))
}

func (c *Client) dealOnceAuthRecord(r *authRcd) {
	const maxDealLimit = 20
	realNum := len(c.arChan)
	if realNum > maxDealLimit-1 {
		realNum = maxDealLimit - 1
	}

	rcdLst := []*authRcd{r}
	for i := 1; i < realNum; i++ {
		select {
		case rcd := <-c.arChan:
			rcdLst = append(rcdLst, rcd)
		default:
			//lint:ignore SA4011 ineffective break statement. Did you mean to break out of the outer loop?
			break
		}
	}

	c.muAuthData.Lock()
	defer c.muAuthData.Unlock()
	for _, rcd := range rcdLst {
		c.dealOncAuthSuccRcd(rcd)
		c.dealOncAuthFailRcd(rcd)
	}
}

func (c *Client) clearInvalidAuthData() {
	c.muAuthData.Lock()
	defer c.muAuthData.Unlock()

	// clear invalid auth rcd
	var userLst []string
	for k, v := range c.authFailRcds {
		if time.Since(v.occurTimeLst[len(v.occurTimeLst)-1]).Seconds() >= maxLoginValidTime {
			c.logger.Info("delete invalid authFailrcd", zap.String("user", k), zap.Any("time data", v.occurTimeLst))
			userLst = append(userLst, k)
		}
	}
	for _, u := range userLst {
		delete(c.authFailRcds, u)
	}

	userLst = []string{}
	for k, v := range c.authSuccRcds {
		if time.Since(v).Seconds() >= maxLoginValidTime {
			c.logger.Info("delete invalid asc rcd", zap.String("user", k), zap.Any("time", v))
			userLst = append(userLst, k)
		}
	}
	for _, u := range userLst {
		delete(c.authSuccRcds, u)
	}
}

func (c *Client) isLockedUser(u string) bool {
	c.muAuthData.RLock()
	defer c.muAuthData.RUnlock()
	if v, ok := c.authFailRcds[u]; ok {
		if len(v.occurTimeLst) >= maxLoginLimit {
			c.logger.Info("The user has been locked.", zap.String("user", u))
			return true
		}
	}
	return false
}

// Authenticate returns a UserInfo if the username and password match an existing entry.
func (c *Client) Authenticate(username, password string) (u meta2.User, e error) {
	//verify user lock or not
	if c.isLockedUser(username) {
		return nil, meta2.ErrUserLocked
	}
	//record auth, refresh cache
	defer func() {
		rst := e == nil
		c.arChan <- &authRcd{user: username, result: rst, occurTime: time.Now()}
	}()

	// Find user.
	c.mu.RLock()
	userInfo := c.cacheData.GetUser(username)
	c.mu.RUnlock()
	if userInfo == nil {
		return nil, meta2.ErrUserNotFound
	}

	// Check the local auth cache first.
	c.mu.RLock()
	au, ok := c.authCache[username]
	c.mu.RUnlock()
	if ok {
		// verify the password using the cached salt and hash
		if bytes.Equal(c.encryptWithSalt(au.salt, password), au.hash) {
			return userInfo, nil
		}

		// fall through to requiring a full bcrypt hash for invalid passwords
	}

	// Compare password with user hash.
	if err := c.CompareHashAndPlainPwd(userInfo.Hash, password); err != nil {
		return nil, meta2.ErrAuthenticate
	}

	// generate a salt and hash of the password for the cache
	salt, hashed, err := c.saltedEncryptByVer(password)
	if err != nil {
		return nil, err
	}
	c.mu.Lock()
	c.authCache[username] = authUser{salt: salt, hash: hashed, bhash: userInfo.Hash}
	c.mu.Unlock()
	return userInfo, nil
}

// UserCount returns the number of users stored.
func (c *Client) UserCount() int {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return len(c.cacheData.Users)
}

// ShardIDs returns a list of all shard ids.
func (c *Client) ShardIDs() []uint64 {
	c.mu.RLock()

	var a []uint64
	for _, dbi := range c.cacheData.Databases {
		for _, rpi := range dbi.RetentionPolicies {
			for _, sgi := range rpi.ShardGroups {
				for _, si := range sgi.Shards {
					a = append(a, si.ID)
				}
			}
		}
	}
	c.mu.RUnlock()
	sort.Sort(uint64Slice(a))
	return a
}

// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data
// for the specified time range. Shard groups are sorted by start time.
func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta2.ShardGroupInfo, err error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	// Find retention policy.
	rpi, err := c.cacheData.RetentionPolicy(database, policy)
	if err != nil {
		return nil, err
	} else if rpi == nil {
		return nil, meta2.ErrRetentionPolicyNotFound(policy)
	}
	groups := make([]meta2.ShardGroupInfo, 0, len(rpi.ShardGroups))
	for _, g := range rpi.ShardGroups {
		if g.Deleted() || !g.Overlaps(min, max) {
			continue
		}
		groups = append(groups, g)
	}
	return groups, nil
}

// ShardsByTimeRange returns a slice of shards that may contain data in the time range.
func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta2.ShardInfo, err error) {
	m := make(map[*meta2.ShardInfo]struct{})
	for _, mm := range sources.Measurements() {
		groups, err := c.ShardGroupsByTimeRange(mm.Database, mm.RetentionPolicy, tmin, tmax)
		if err != nil {
			return nil, err
		}
		for _, g := range groups {
			for i := range g.Shards {
				m[&g.Shards[i]] = struct{}{}
			}
		}
	}

	a = make([]meta2.ShardInfo, 0, len(m))
	for sh := range m {
		a = append(a, *sh)
	}

	return a, nil
}

// DropShard deletes a shard by ID.
func (c *Client) DropShard(id uint64) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	data := c.cacheData.Clone()
	data.DropShard(id)
	return nil
}

// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time, version uint32, engineType config.EngineType) (*meta2.ShardGroupInfo, error) {
	c.mu.RLock()
	sg, tier, err := c.cacheData.GetTierOfShardGroup(database, policy, timestamp, c.ShardTier, engineType)
	if err != nil {
		c.mu.RUnlock()
		return nil, err
	}
	if sg != nil {
		c.mu.RUnlock()
		return sg, nil
	}
	c.mu.RUnlock()

	cmd := &proto2.CreateShardGroupCommand{
		Database:   proto.String(database),
		Policy:     proto.String(policy),
		Timestamp:  proto.Int64(timestamp.UnixNano()),
		ShardTier:  proto.Uint64(tier),
		EngineType: proto.Uint32(uint32(engineType)),
		Version:    proto.Uint32(version),
	}

	if err := c.retryUntilExec(proto2.Command_CreateShardGroupCommand, proto2.E_CreateShardGroupCommand_Command, cmd); err != nil {
		return nil, err
	}

	rpi, err := c.RetentionPolicy(database, policy)
	if err != nil {
		return nil, err
	} else if rpi == nil || rpi.MarkDeleted {
		return nil, errors.New("retention policy deleted after shard group created")
	}

	return rpi.ShardGroupByTimestampAndEngineType(timestamp, engineType), nil
}

func (c *Client) GetShardInfoByTime(database, retentionPolicy string, t time.Time, ptIdx int, nodeId uint64, engineType config.EngineType) (*meta2.ShardInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	rp, err := c.cacheData.RetentionPolicy(database, retentionPolicy)
	if err != nil {
		return nil, err
	}
	if rp.MarkDeleted {
		return nil, errno.NewError(errno.RpNotFound)
	}
	shardGroup := rp.ShardGroupByTimestampAndEngineType(t, engineType)
	if shardGroup == nil {
		return nil, errno.NewError(errno.ShardNotFound)
	}
	if shardGroup.Deleted() {
		return nil, errno.NewError(errno.ShardNotFound)
	}

	info := c.cacheData.PtView[database]
	if info == nil {
		return nil, fmt.Errorf("db %v in PtView not exist", database)
	}
	cnt, ptId := 0, uint32(math.MaxUint32)
	for i := range info {
		if info[i].Owner.NodeID == nodeId {
			if ptIdx == cnt {
				ptId = info[i].PtId
				cnt++
				break
			} else {
				cnt++
			}
		}
	}
	if cnt == 0 || ptId == math.MaxUint32 {
		return nil, errors.New("nodeId cannot find pt")
	}

	shard := shardGroup.Shards[ptId]
	return &shard, nil
}

func (c *Client) getAliveShardsForWAF(database string, sgi *meta2.ShardGroupInfo) []int {
	c.mu.RLock()
	aliveShardIdxes := make([]int, 0, len(sgi.Shards))
	for i := range sgi.Shards {
		if c.cacheData.PtView[database][sgi.Shards[i].Owners[0]].Status == meta2.Online {
			aliveShardIdxes = append(aliveShardIdxes, i)
		}
	}
	c.mu.RUnlock()
	return aliveShardIdxes
}

func (c *Client) getAliveShardsForSSAndRep(database string, sgi *meta2.ShardGroupInfo) []int {
	repGroups := c.DBRepGroups(database)
	replicaN, err := c.GetReplicaN(database)
	if replicaN == 0 || err != nil {
		replicaN = 1
	}

	c.mu.RLock()
	aliveShardIdxes := make([]int, 0, len(sgi.Shards)/replicaN)
	ptView := c.cacheData.PtView[database]
	for i := range sgi.Shards {
		for _, ptId := range sgi.Shards[i].Owners {
			if replicaN == 1 {
				aliveShardIdxes = append(aliveShardIdxes, i)
				break
			}
			if repGroups[ptView[ptId].RGID].IsMasterPt(ptId) {
				aliveShardIdxes = append(aliveShardIdxes, i)
				break
			}
		}
	}
	c.mu.RUnlock()
	return aliveShardIdxes
}

/*
used for map shards in select and write progress.
write progress shard for all shards in shared-storage and replication policy.
*/
func (c *Client) GetAliveShards(database string, sgi *meta2.ShardGroupInfo) []int {
	if config.GetHaPolicy() != config.WriteAvailableFirst {
		return c.getAliveShardsForSSAndRep(database, sgi)
	}
	return c.getAliveShardsForWAF(database, sgi)
}

func (c *Client) DeleteIndexGroup(database, policy string, id uint64) error {
	cmd := &proto2.DeleteIndexGroupCommand{
		Database:     proto.String(database),
		Policy:       proto.String(policy),
		IndexGroupID: proto.Uint64(id),
	}
	_, err := c.retryExec(proto2.Command_DeleteIndexGroupCommand, proto2.E_DeleteIndexGroupCommand_Command, cmd)
	return err
}

// DeleteShardGroup removes a shard group from a database and retention policy by id.
func (c *Client) DeleteShardGroup(database, policy string, id uint64) error {
	cmd := &proto2.DeleteShardGroupCommand{
		Database:     proto.String(database),
		Policy:       proto.String(policy),
		ShardGroupID: proto.Uint64(id),
	}

	_, err := c.retryExec(proto2.Command_DeleteShardGroupCommand, proto2.E_DeleteShardGroupCommand_Command, cmd)
	return err
}

// PyStore send command to PyMeta. NO need to waitForIndex.
func (c *Client) PruneGroupsCommand(shardGroup bool, id uint64) error {
	cmd := &proto2.PruneGroupsCommand{
		ShardGroup: proto.Bool(shardGroup),
		ID:         proto.Uint64(id),
	}
	_, err := c.retryExec(proto2.Command_PruneGroupsCommand, proto2.E_PruneGroupsCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) UpdateShardInfoTier(shardID uint64, tier uint64, dbName, rpName string) error {
	cmd := &proto2.UpdateShardInfoTierCommand{
		ShardID: proto.Uint64(shardID),
		Tier:    proto.Uint64(tier),
		DbName:  proto.String(dbName),
		RpName:  proto.String(rpName),
	}
	_, err := c.retryExec(proto2.Command_UpdateShardInfoTierCommand, proto2.E_UpdateShardInfoTierCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

// ShardOwner returns the owning shard group info for a specific shard.
func (c *Client) ShardOwner(shardID uint64) (database, policy string, sgi *meta2.ShardGroupInfo) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	for dbIdx := range c.cacheData.Databases {
		rps := c.cacheData.Databases[dbIdx].RetentionPolicies
		for rpIdx := range rps {
			for sgIdx := range rps[rpIdx].ShardGroups {
				sg := &rps[rpIdx].ShardGroups[sgIdx]
				if sg.Deleted() {
					continue
				}
				for shIdx := range sg.Shards {
					if sg.Shards[shIdx].ID == shardID {
						database = c.cacheData.Databases[dbIdx].Name
						policy = rps[rpIdx].Name
						sgi = sg
						return
					}
				}
			}
		}
	}
	return
}

// JoinMetaServer will add the passed in tcpAddr to the raft peers and add a MetaNode to
// the metastore
func (c *Client) JoinMetaServer(httpAddr, rpcAddr, tcpAddr string) (*meta2.NodeInfo, error) {
	node := &meta2.NodeInfo{
		Host:    httpAddr,
		RPCAddr: rpcAddr,
		TCPHost: tcpAddr,
	}
	b, err := json.Marshal(node)
	if err != nil {
		return nil, err
	}

	currentServer := 0
	for {
		c.mu.RLock()
		if currentServer >= len(c.metaServers) {
			// We've tried every server, wait a second before
			// trying again
			time.Sleep(errSleep)
			currentServer = 0
		}
		c.mu.RUnlock()

		callback := &JoinCallback{NodeInfo: node}
		msg := message.NewMetaMessage(message.UpdateRequestMessage, &message.UpdateRequest{Body: b})
		err = c.SendRPCMsg(currentServer, msg, callback)
		if err != nil {
			currentServer++
			continue
		}
		// Successfully joined
		break
	}
	return node, nil
}

func (c *Client) CreateMetaNode(httpAddr, tcpAddr string) (*meta2.NodeInfo, error) {
	cmd := &proto2.CreateMetaNodeCommand{
		HTTPAddr: proto.String(httpAddr),
		TCPAddr:  proto.String(tcpAddr),
		Rand:     proto.Uint64(uint64(rand.Int63())),
	}

	if err := c.retryUntilExec(proto2.Command_CreateMetaNodeCommand, proto2.E_CreateMetaNodeCommand_Command, cmd); err != nil {
		return nil, err
	}

	n := c.MetaNodeByAddr(httpAddr)
	if n == nil {
		return nil, errors.New("new meta node not found")
	}

	c.nodeID = n.ID

	return n, nil
}

func (c *Client) DeleteMetaNode(id uint64) error {
	cmd := &proto2.DeleteMetaNodeCommand{
		ID: proto.Uint64(id),
	}

	return c.retryUntilExec(proto2.Command_DeleteMetaNodeCommand, proto2.E_DeleteMetaNodeCommand_Command, cmd)
}

// validateURL returns an error if the URL does not have a port or uses a scheme other than HTTP.
func validateURL(input string) error {
	u, err := url.Parse(input)
	if err != nil {
		return errors.New("invalid url")
	}

	if u.Scheme != "http" && u.Scheme != "https" {
		return errors.New("invalid url")
	}

	_, port, err := net.SplitHostPort(u.Host)
	if err != nil || port == "" {
		return errors.New("invalid url")
	}

	return nil
}

func pingServer(server string) error {
	pingUrl := server + "/ping"
	client := http.Client{Timeout: time.Second}
	req, err := http.NewRequest("GET", pingUrl, nil)
	if err != nil {
		return err
	}
	_, err = client.Do(req)
	if err != nil {
		return err
	}
	return nil
}

// CreateSubscription creates a subscription against the given database and retention policy.
func (c *Client) CreateSubscription(database, rp, name, mode string, destinations []string) error {
	for _, destination := range destinations {
		if err := validateURL(destination); err != nil {
			return fmt.Errorf("invalid url %s", destination)
		}
		if err := pingServer(destination); err != nil {
			return fmt.Errorf("fail to ping %s", destination)
		}
	}
	return c.retryUntilExec(proto2.Command_CreateSubscriptionCommand, proto2.E_CreateSubscriptionCommand_Command,
		&proto2.CreateSubscriptionCommand{
			Database:        proto.String(database),
			RetentionPolicy: proto.String(rp),
			Name:            proto.String(name),
			Mode:            proto.String(mode),
			Destinations:    destinations,
		},
	)
}

// DropSubscription removes the named subscription from the given database and retention policy.
func (c *Client) DropSubscription(database, rp, name string) error {
	return c.retryUntilExec(proto2.Command_DropSubscriptionCommand, proto2.E_DropSubscriptionCommand_Command,
		&proto2.DropSubscriptionCommand{
			Database:        proto.String(database),
			RetentionPolicy: proto.String(rp),
			Name:            proto.String(name),
		},
	)
}

func (c *Client) GetMaxSubscriptionID() uint64 {
	return c.cacheData.MaxSubscriptionID
}

// SetData overwrites the underlying data in the meta store.
func (c *Client) SetData(data *meta2.Data) error {
	return c.retryUntilExec(proto2.Command_SetDataCommand, proto2.E_SetDataCommand_Command,
		&proto2.SetDataCommand{
			Data: data.Marshal(),
		},
	)
}

// WaitForDataChanged returns a channel that will get a stuct{} when
// the metastore data has changed.
func (c *Client) WaitForDataChanged() chan struct{} {
	ch := make(chan struct{})

	c.changed <- ch
	return ch
}

func (c *Client) index() uint64 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.Index
}

// retryUntilExec will attempt the command on each of the metaservers until it either succeeds or
// hits the max number of tries
func (c *Client) retryUntilExec(typ proto2.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
	index, err := c.retryExec(typ, desc, value)
	if err != nil {
		return err
	}
	c.waitForIndex(index)
	return nil
}

func (c *Client) retryExec(typ proto2.Command_Type, desc *proto.ExtensionDesc, value interface{}) (index uint64, err error) {
	// TODO do not use index to check cache data is newest
	tries := 0
	currentServer := connectedServer
	timeout := time.After(RetryExecTimeout)

	for {
		c.mu.RLock()
		// exit if we're closed
		select {
		case <-timeout:
			c.mu.RUnlock()
			return c.index(), meta2.ErrCommandTimeout
		case <-c.closing:
			c.mu.RUnlock()
			return c.index(), meta2.ErrClientClosed
		default:
			// we're still open, continue on
		}
		c.mu.RUnlock()

		// build the url to hit the redirect server or the next metaserver
		var server string

		c.mu.RLock()
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		server = c.metaServers[currentServer]
		c.mu.RUnlock()

		index, err = c.exec(currentServer, typ, desc, value, message.ExecuteRequestMessage)

		if err == nil {
			return index, nil
		}
		tries++
		currentServer++

		if strings.Contains(err.Error(), "node is not the leader") {
			continue
		}

		if _, ok := err.(errCommand); ok {
			return c.index(), err
		}
		c.logger.Info("retryUntilExec retry", zap.String("server", server), zap.Any("type", typ),
			zap.Int("tries", tries), zap.Error(err))
		time.Sleep(errSleep)
	}
}

func getMetaMsg(msgTy uint8, body []byte) *message.MetaMessage {
	switch msgTy {
	case message.ExecuteRequestMessage:
		return message.NewMetaMessage(msgTy, &message.ExecuteRequest{Body: body})
	case message.ReportRequestMessage:
		return message.NewMetaMessage(msgTy, &message.ReportRequest{Body: body})
	default:
		return nil
	}
}

func (c *Client) exec(currentServer int, typ proto2.Command_Type, desc *proto.ExtensionDesc, value interface{}, msgTy uint8) (index uint64, err error) {
	// Create command.
	cmd := &proto2.Command{Type: &typ}
	if err := proto.SetExtension(cmd, desc, value); err != nil {
		panic(err)
	}

	b, err := proto.Marshal(cmd)
	if err != nil {
		return 0, err
	}

	callback := &ExecuteAndReportCallback{Typ: msgTy}
	msg := getMetaMsg(msgTy, b)
	err = c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return 0, err
	} else if callback.ErrCommand != nil {
		return 0, *callback.ErrCommand
	}
	return callback.Index, nil
}

func (c *Client) waitForIndex(idx uint64) {
	for {
		ch := c.WaitForDataChanged()
		c.mu.RLock()
		if c.cacheData.Index >= idx {
			c.mu.RUnlock()
			return
		}
		c.mu.RUnlock()
		<-ch
	}
}

// Role: sql/store/meta
func (c *Client) pollForUpdates(role Role) {
	for {
		data := c.retryUntilSnapshot(role, c.index())
		if data == nil {
			// this will only be nil if the client has been closed,
			// so we can exit out
			c.logger.Error("client has been closed")
			return
		}
		// update the data and notify of the change
		c.mu.Lock()
		idx := c.cacheData.Index
		if idx < data.Index {
			c.cacheData = data
			c.updateAuthCache()
			c.replicaInfoManager.Update(data, c.nodeID)
			for len(c.changed) > 0 {
				notifyC := <-c.changed
				close(notifyC)
			}
		}
		c.mu.Unlock()
	}
}

func (c *Client) getNode(currentServer int, writeHost, queryHost, role string) (*meta2.NodeStartInfo, error) {
	callback := &CreateNodeCallback{
		NodeStartInfo: &meta2.NodeStartInfo{},
	}
	msg := message.NewMetaMessage(message.CreateNodeRequestMessage, &message.CreateNodeRequest{WriteHost: writeHost, QueryHost: queryHost, Role: role})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	return callback.NodeStartInfo, nil
}

func (c *Client) getShardInfo(currentServer int, cmd *proto2.Command) ([]byte, error) {
	b, err := proto.Marshal(cmd)
	if err != nil {
		return nil, err
	}
	callback := &GetShardInfoCallback{}
	msg := message.NewMetaMessage(message.GetShardInfoRequestMessage, &message.GetShardInfoRequest{Body: b})
	err = c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	return callback.Data, nil
}

func (c *Client) getSnapshot(role Role, currentServer int, index uint64) (*meta2.Data, error) {
	c.logger.Debug("getting snapshot from start")

	callback := &SnapshotCallback{}
	msg := message.NewMetaMessage(message.SnapshotRequestMessage, &message.SnapshotRequest{Role: int(role), Index: index})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	c.logger.Debug("getting snapshot from end")

	if len(callback.Data) == 0 {
		return nil, nil
	}
	stat := statistics.NewMetaStatistics()
	stat.AddSnapshotTotal(1)
	stat.AddSnapshotDataSize(int64(len(callback.Data)))

	start := time.Now()
	data := &meta2.Data{}
	if err = data.UnmarshalBinary(callback.Data); err != nil {
		return nil, err
	}
	stat.AddSnapshotUnmarshalDuration(time.Since(start).Milliseconds())

	return data, nil
}

func (c *Client) getDownSampleInfo(currentServer int) ([]byte, error) {
	callback := &GetDownSampleInfoCallback{}
	msg := message.NewMetaMessage(message.GetDownSampleInfoRequestMessage, &message.GetDownSampleInfoRequest{})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	return callback.Data, nil
}

func (c *Client) getRpMstInfos(currentServer int, dbName, rpName string, dataTypes []int64) ([]byte, error) {
	callback := &GetRpMstInfoCallback{}
	msg := message.NewMetaMessage(message.GetRpMstInfosRequestMessage, &message.GetRpMstInfosRequest{
		DbName:    dbName,
		RpName:    rpName,
		DataTypes: dataTypes,
	})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	return callback.Data, nil
}

// Peers returns the TCPHost addresses of all the metaservers
func (c *Client) Peers() []string {

	var peers Peers
	// query each server and keep track of who their peers are
	for currentServer := range c.metaServers {
		callback := &PeersCallback{}
		msg := message.NewMetaMessage(message.PeersRequestMessage, &message.PeersRequest{})
		err := c.SendRPCMsg(currentServer, msg, callback)
		if err != nil {
			continue
		}
		//// This meta-server might not be ready to answer, continue on
		p := callback.Peers
		peers = peers.Append(p...)
	}

	// Return the unique set of peer addresses
	return []string(peers.Unique())
}

func (c *Client) updateAuthCache() {
	// copy cached user info for still-present users
	newCache := make(map[string]authUser, len(c.authCache))

	for _, userInfo := range c.cacheData.Users {
		if cached, ok := c.authCache[userInfo.Name]; ok {
			if cached.bhash == userInfo.Hash {
				newCache[userInfo.Name] = cached
			}
		}
	}

	c.authCache = newCache
}

var connectedServer int

func (c *Client) retryUntilSnapshot(role Role, idx uint64) *meta2.Data {
	currentServer := connectedServer
	for {
		// get the index to look from and the server to poll
		c.mu.RLock()
		// exit if we're closed
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil
		default:
			// we're still open, continue on
		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		server := c.metaServers[currentServer]
		c.mu.RUnlock()

		data, err := c.getSnapshot(role, currentServer, idx)

		if err == nil && data != nil {
			return data
		} else if err == nil && data == nil {
			continue
		}

		c.logger.Debug("failure getting snapshot from", zap.String("server", server), zap.Error(err))
		time.Sleep(errSleep)

		currentServer++
	}
}

func (c *Client) RetryDownSampleInfo() ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var downSampleInfo []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, meta2.ErrClientClosed
		default:
		}
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		downSampleInfo, err = c.getDownSampleInfo(currentServer)
		if err == nil {
			break
		}
		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)
		currentServer++
	}
	return downSampleInfo, err
}

func (c *Client) RetryMstInfosInRp(dbName, rpName string, dataTypes []int64) ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var mstInfos []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, meta2.ErrClientClosed
		default:
		}
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		mstInfos, err = c.getRpMstInfos(currentServer, dbName, rpName, dataTypes)
		if err == nil {
			break
		}
		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)
		currentServer++
	}
	return mstInfos, err
}

func (c *Client) RetryGetShardAuxInfo(cmd *proto2.Command) ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var shardAuxInfo []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, meta2.ErrClientClosed
		default:

		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		shardAuxInfo, err = c.getShardInfo(currentServer, cmd)
		if err == nil {
			break
		}
		if errno.Equal(err, errno.ShardMetaNotFound) {
			c.logger.Error("get shard info failed", zap.String("cmd", cmd.String()), zap.Error(err))
			break
		}

		if time.Since(startTime).Nanoseconds() > int64(len(c.metaServers))*HttpReqTimeout.Nanoseconds() {
			c.logger.Error("get shard info timeout", zap.String("cmd", cmd.String()), zap.Error(err))
			break
		}
		c.logger.Error("retry get shard info", zap.String("cmd", cmd.String()), zap.Error(err))
		time.Sleep(errSleep)

		currentServer++
	}
	return shardAuxInfo, err
}

func (c *Client) GetShardRangeInfo(db string, rp string, shardID uint64) (*meta2.ShardTimeRangeInfo, error) {
	val := &proto2.TimeRangeCommand{
		Database: proto.String(db),
		Policy:   proto.String(rp),
		ShardID:  proto.Uint64(shardID),
	}

	t := proto2.Command_TimeRangeCommand
	cmd := &proto2.Command{Type: &t}
	if err := proto.SetExtension(cmd, proto2.E_TimeRangeCommand_Command, val); err != nil {
		panic(err)
	}
	b, err := c.RetryGetShardAuxInfo(cmd)
	if err != nil {
		return nil, err
	}

	rangeInfo := &meta2.ShardTimeRangeInfo{}
	if err := rangeInfo.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return rangeInfo, err
}

func (c *Client) GetShardDurationInfo(index uint64) (*meta2.ShardDurationResponse, error) {
	val := &proto2.ShardDurationCommand{Index: proto.Uint64(index), Pts: nil, NodeId: proto.Uint64(c.nodeID)}
	t := proto2.Command_ShardDurationCommand
	cmd := &proto2.Command{Type: &t}
	if err := proto.SetExtension(cmd, proto2.E_ShardDurationCommand_Command, val); err != nil {
		panic(err)
	}
	b, err := c.RetryGetShardAuxInfo(cmd)
	if err != nil {
		return nil, err
	}

	durationInfo := &meta2.ShardDurationResponse{}
	if err := durationInfo.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return durationInfo, nil
}

func (c *Client) MetaServers() []string {
	return c.metaServers
}

func (c *Client) InitMetaClient(joinPeers []string, tlsEn bool, storageNodeInfo *StorageNodeInfo, role string) (uint64, uint64, uint64, error) {
	// It's the first time starting up and we need to either join
	// the cluster or initialize this node as the first member
	if len(joinPeers) == 0 {
		// start up a new single node cluster
		return 0, 0, 0, fmt.Errorf("invlude meta nodes host")
	}

	// join this node to dthe cluster
	c.SetMetaServers(joinPeers)
	c.SetTLS(tlsEn)

	var nid uint64
	var err error
	var clock uint64
	var connId uint64
	if storageNodeInfo != nil {
		nid, clock, connId, err = c.CreateDataNode(storageNodeInfo.InsertAddr, storageNodeInfo.SelectAddr, role)
	}

	return nid, clock, connId, err
}

func (c *Client) retryReport(typ proto2.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
	tries := 0
	currentServer := connectedServer
	timeout := time.After(RetryReportTimeout)

	for {
		c.mu.RLock()
		// exit if we're closed
		select {
		case <-timeout:
			c.mu.RUnlock()
			return meta2.ErrCommandTimeout
		case <-c.closing:
			c.mu.RUnlock()
			return nil
		default:
			// we're still open, continue on
		}
		c.mu.RUnlock()

		// build the url to hit the redirect server or the next metaserver
		var server string
		c.mu.RLock()
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		server = c.metaServers[currentServer]
		c.mu.RUnlock()

		_, err := c.exec(currentServer, typ, desc, value, message.ReportRequestMessage)

		if err == nil {
			return nil
		}
		tries++
		currentServer++

		if strings.Contains(err.Error(), "node is not the leader") {
			continue
		}

		if _, ok := err.(errCommand); ok {
			return err
		}
		c.logger.Info("retryUntilExec retry", zap.String("server", server), zap.Any("type", typ),
			zap.Int("tries", tries), zap.Error(err))
		time.Sleep(errSleep)
	}
}

func (c *Client) ReportShardLoads(dbPTStats []*proto2.DBPtStatus) error {
	cmd := &proto2.ReportShardsLoadCommand{
		DBPTStat: dbPTStats,
	}
	return c.retryReport(proto2.Command_ReportShardsCommand, proto2.E_ReportShardsLoadCommand_Command, cmd)
}

func (c *Client) Measurements(database string, ms influxql.Measurements) ([]string, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	mstMaps, err := c.matchMeasurements(database, ms)
	if err != nil {
		return nil, err
	}
	var measurements []string
	for _, mi := range mstMaps {
		measurements = append(measurements, mi.OriginName())
	}

	if len(measurements) == 0 {
		return measurements, nil
	}
	sort.Strings(measurements)

	l := len(measurements)
	for i := 1; i < l; i++ {
		if measurements[i] == measurements[i-1] {
			l--
			measurements = append(measurements[:i-1], measurements[i:]...)
			i--
		}
	}
	return measurements, nil
}

func (c *Client) MatchMeasurements(database string, ms influxql.Measurements) (map[string]*meta2.MeasurementInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.matchMeasurements(database, ms)
}

func (c *Client) matchMeasurements(database string, ms influxql.Measurements) (map[string]*meta2.MeasurementInfo, error) {
	dbi, err := c.cacheData.GetDatabase(database)
	if err != nil {
		return nil, err
	}

	ret := make(map[string]*meta2.MeasurementInfo)
	dbi.WalkRetentionPolicy(func(rp *meta2.RetentionPolicyInfo) {
		rp.MatchMeasurements(ms, ret)
	})

	return ret, nil
}

func (c *Client) QueryTagKeys(database string, ms influxql.Measurements, cond influxql.Expr) (map[string]map[string]struct{}, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	mis, err := c.matchMeasurements(database, ms)
	if err != nil {
		return nil, err
	}
	if len(mis) == 0 {
		return nil, nil
	}

	// map[measurement name] map[tag key] struct{}
	ret := make(map[string]map[string]struct{}, len(mis))
	for _, m := range mis {
		if _, ok := ret[m.Name]; !ok {
			ret[m.Name] = make(map[string]struct{})
		}
		m.MatchTagKeys(cond, ret)
	}

	return ret, nil
}

func (c *Client) NewDownSamplePolicy(database, name string, info *meta2.DownSamplePolicyInfo) error {
	cmd := &proto2.CreateDownSamplePolicyCommand{
		Database:             proto.String(database),
		Name:                 proto.String(name),
		DownSamplePolicyInfo: info.Marshal(),
	}
	err := c.retryUntilExec(proto2.Command_CreateDownSamplePolicyCommand, proto2.E_CreateDownSamplePolicyCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) DropDownSamplePolicy(database, name string, dropAll bool) error {
	cmd := &proto2.DropDownSamplePolicyCommand{
		Database: proto.String(database),
		RpName:   proto.String(name),
		DropAll:  proto.Bool(dropAll),
	}
	err := c.retryUntilExec(proto2.Command_DropDownSamplePolicyCommand, proto2.E_DropDownSamplePolicyCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) ShowDownSamplePolicies(database string) (models.Rows, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowDownSamplePolicies(database)
}

func (c *Client) GetDownSamplePolicies() (*meta2.DownSamplePoliciesInfoWithDbRp, error) {
	b, err := c.RetryDownSampleInfo()
	if err != nil {
		return nil, err
	}
	return c.unmarshalDownSamplePolicies(b)
}

func (c *Client) unmarshalDownSamplePolicies(b []byte) (*meta2.DownSamplePoliciesInfoWithDbRp, error) {
	DownSample := &meta2.DownSamplePoliciesInfoWithDbRp{}
	if err := DownSample.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return DownSample, nil
}

func (c *Client) GetMstInfoWithInRp(dbName, rpName string, dataTypes []int64) (*meta2.RpMeasurementsFieldsInfo, error) {
	b, err := c.RetryMstInfosInRp(dbName, rpName, dataTypes)
	if err != nil {
		return nil, err
	}
	return c.unmarshalMstInfoWithInRp(b)
}

func (c *Client) unmarshalMstInfoWithInRp(b []byte) (*meta2.RpMeasurementsFieldsInfo, error) {
	DownSample := &meta2.RpMeasurementsFieldsInfo{}
	if err := DownSample.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return DownSample, nil
}

func (c *Client) UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error {
	val := &proto2.UpdateShardDownSampleInfoCommand{
		Ident: Ident.Marshal(),
	}
	if _, err := c.retryExec(proto2.Command_UpdateShardDownSampleInfoCommand, proto2.E_UpdateShardDownSampleInfoCommand_Command, val); err != nil {
		return err
	}
	return nil
}

func (c *Client) GetStreamInfos() map[string]*meta2.StreamInfo {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.Streams
}

// GetDstStreamInfos get the stream info whose db and rip of the data are the same as the db and rp of the source table of the stream
// Note: make sure dstSis is initialized
func (c *Client) GetDstStreamInfos(db, rp string, dstSis *[]*meta2.StreamInfo) bool {
	c.mu.RLock()
	defer c.mu.RUnlock()
	if len(c.cacheData.Streams) == 0 {
		return false
	}
	i := 0
	*dstSis = (*dstSis)[:cap(*dstSis)]
	for _, si := range c.cacheData.Streams {
		if si.SrcMst.Database == db && si.SrcMst.RetentionPolicy == rp {
			if len(*dstSis) < i+1 {
				*dstSis = append(*dstSis, si)
			} else {
				(*dstSis)[i] = si
			}
			i++
		}
	}
	*dstSis = (*dstSis)[:i]
	return len(*dstSis) > 0
}

func (c *Client) UpdateStreamMstSchema(database string, retentionPolicy string, mst string, stmt *influxql.SelectStatement) error {
	fieldToCreate := make([]*proto2.FieldSchema, 0, len(stmt.Fields)+len(stmt.Dimensions))
	fields := stmt.Fields
	for i := range fields {
		f, ok := fields[i].Expr.(*influxql.Call)
		if !ok {
			return errors.New("unexpected call function")
		}
		if fields[i].Alias == "" {
			fieldToCreate = append(fieldToCreate, &proto2.FieldSchema{
				FieldName: proto.String(f.Name + "_" + f.Args[0].(*influxql.VarRef).Val),
			})
		} else {
			fieldToCreate = append(fieldToCreate, &proto2.FieldSchema{
				FieldName: proto.String(fields[i].Alias),
			})
		}
		valuer := influxql.TypeValuerEval{
			TypeMapper: DefaultTypeMapper,
		}
		t, e := valuer.EvalType(f, false)
		if e != nil {
			return e
		}
		switch t {
		case influxql.Float:
			fieldToCreate[i].FieldType = proto.Int32(influx.Field_Type_Float)
		case influxql.Integer:
			fieldToCreate[i].FieldType = proto.Int32(influx.Field_Type_Int)
		case influxql.Boolean:
			fieldToCreate[i].FieldType = proto.Int32(influx.Field_Type_Int)
		case influxql.String:
			fieldToCreate[i].FieldType = proto.Int32(influx.Field_Type_String)
		default:
			return errors.New("unexpected call type")
		}
	}
	dims := stmt.Dimensions
	for i := range dims {
		d, ok := dims[i].Expr.(*influxql.VarRef)
		if !ok {
			continue
		}
		fieldToCreate = append(fieldToCreate, &proto2.FieldSchema{
			FieldName: proto.String(d.Val),
			FieldType: proto.Int32(influx.Field_Type_Tag),
		})
	}
	return c.UpdateSchema(database, retentionPolicy, mst, fieldToCreate)
}

func (c *Client) CreateStreamPolicy(info *meta2.StreamInfo) error {
	cmd := &proto2.CreateStreamCommand{
		StreamInfo: info.Marshal(),
	}
	err := c.retryUntilExec(proto2.Command_CreateStreamCommand, proto2.E_CreateStreamCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) GetStreamInfosStore() map[string]*meta2.StreamInfo {
	return c.RetryGetStreamInfosStore()
}

func (c *Client) RetryGetStreamInfosStore() map[string]*meta2.StreamInfo {
	startTime := time.Now()
	currentServer := connectedServer
	var infos map[string]*meta2.StreamInfo
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil
		default:

		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		infos = c.GetStreamInfosForStore(currentServer)
		if infos != nil {
			break
		}

		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)

		currentServer++
	}
	return infos
}

func (c *Client) GetStreamInfosForStore(currentServer int) map[string]*meta2.StreamInfo {
	callback := &GetStreamInfoCallback{}
	msg := message.NewMetaMessage(message.GetStreamInfoRequestMessage, &message.GetStreamInfoRequest{Body: nil})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		c.logger.Error("GetStreamInfosForStore SendRPCMsg fail", zap.Error(err))
		return nil
	}
	pb := &proto2.StreamInfos{}
	err = proto.Unmarshal(callback.Data, pb)
	if err != nil {
		return nil
	}
	metaInfos := make(map[string]*meta2.StreamInfo)
	for _, v := range pb.GetInfos() {
		s := &meta2.StreamInfo{}
		s.Unmarshal(v)
		metaInfos[s.Name] = s
	}
	return metaInfos
}

func (c *Client) ShowStreams(database string, showAll bool) (models.Rows, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowStreams(database, showAll)
}

func (c *Client) DropStream(name string) error {
	cmd := &proto2.DropStreamCommand{
		Name: proto.String(name),
	}
	if err := c.retryUntilExec(proto2.Command_DropStreamCommand, proto2.E_DropStreamCommand_Command, cmd); err != nil {
		return nil
	}
	return nil
}

var VerifyNodeEn = true

/*
case :
1.store1 network partition
2.assign dbpt to store2
3.store1 network recovery
4.move dbpt to store1, store1 already has this dbpt
5.write rows to dbpt, store1 has no lease and panic
attention:
if always recover lease in assign dbpt no matter db pt is already on this node
flush may write older data to disk, compaction may choose file which has already deleted
stop flush and compaction , then clear memtable may solve this problem, but need replay new logs
*/
func (c *Client) verifyDataNodeStatus() {
	tries := 0
	for {
		select {
		case <-c.closing:
			return
		default:
			time.Sleep(10 * time.Second)
			if !VerifyNodeEn {
				continue
			}
			if err := c.retryVerifyDataNodeStatus(); err != nil {
				tries++
				c.logger.Error("Verify retry", zap.Int("tries", tries), zap.Error(err))
				if tries >= 3 {
					c.Suicide(err)
				}
				continue
			}
			tries = 0
		}
	}
}

func (c *Client) retryVerifyDataNodeStatus() error {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil
		default:
		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		err = c.verifyDataNodeStatusCmd(currentServer, c.NodeID())
		if err == nil {
			break
		}
		c.logger.Debug("verify datanode status failed", zap.Uint64("node id", c.NodeID()), zap.Error(err), zap.Duration("duration", time.Since(startTime)))
		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)

		currentServer++
	}
	return err
}

func (c *Client) verifyDataNodeStatusCmd(currentServer int, nodeID uint64) error {
	callback := &VerifyDataNodeStatusCallback{}
	msg := message.NewMetaMessage(message.VerifyDataNodeStatusRequestMessage, &message.VerifyDataNodeStatusRequest{NodeID: nodeID})
	return c.SendRPCMsg(currentServer, msg, callback)
}

func (c *Client) Suicide(err error) {
	c.logger.Error("Suicide for fault data node", zap.Error(err))
	time.Sleep(errSleep)
	sysinfo.Suicide()
}

func (c *Client) TagArrayEnabled(db string) bool {
	c.mu.RLock()
	defer c.mu.RUnlock()
	if _, ok := c.cacheData.Databases[db]; !ok {
		return false
	}

	return c.cacheData.Databases[db].EnableTagArray
}

// RetryRegisterQueryIDOffset send a register rpc to ts-meta，request a query id offset
func (c *Client) RetryRegisterQueryIDOffset(host string) (uint64, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var offset uint64
	var ok bool
	var err error

	for {
		c.mu.RLock()

		if offset, ok = c.cacheData.QueryIDInit[meta2.SQLHost(host)]; ok {
			c.logger.Info("current host has already registered in ts-meta")
			c.mu.RUnlock()
			return offset, nil
		}

		select {
		case <-c.closing:
			c.mu.RUnlock()
			return 0, meta2.ErrClientClosed
		default:
			// we're still open, continue on
		}
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()

		offset, err = c.registerOffset(currentServer, host)
		if err == nil {
			return offset, nil
		}

		currentServer++

		if strings.Contains(err.Error(), "node is not the leader") {
			continue
		}

		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			return 0, errors.New("register query id offset timeout")
		}
		time.Sleep(errSleep)
	}
}

func (c *Client) registerOffset(currentServer int, host string) (uint64, error) {
	callback := &RegisterQueryIDOffsetCallback{}
	msg := message.NewMetaMessage(message.RegisterQueryIDOffsetRequestMessage, &message.RegisterQueryIDOffsetRequest{
		Host: host})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return 0, err
	}
	return callback.Offset, nil
}

func (c *Client) ThermalShards(dbName string, start, end time.Duration) map[uint64]struct{} {
	startTime := time.Now()
	shards := make(map[uint64]struct{})
	var lt, rt time.Time

	if start != 0 {
		lt = time.Now().Add(-start).UTC()
	}
	if end != 0 {
		rt = time.Now().Add(end).UTC()
	}

	c.mu.RLock()
	defer c.mu.RUnlock()
	db, ok := c.cacheData.Databases[dbName]
	if !ok {
		return nil
	}

	for _, rp := range db.RetentionPolicies {
		if lt.IsZero() {
			lt = time.Now().Add(-rp.ShardGroupDuration).UTC()
		}
		if rt.IsZero() {
			rt = time.Now().Add(rp.ShardGroupDuration).UTC()
		}
		for i := 0; i < len(rp.ShardGroups); i++ {
			if rp.ShardGroups[i].Deleted() {
				continue
			}
			sg := &rp.ShardGroups[i]
			if sg.EndTime.Before(lt) || sg.EndTime.After(rt) {
				continue
			}
			for k := 0; k < len(sg.Shards); k++ {
				shards[sg.Shards[k].ID] = struct{}{}
			}
		}
	}
	c.logger.Info("thermal shards", zap.String("db", dbName), zap.Time("start", lt), zap.Time("end", rt),
		zap.Any("shards", shards), zap.Duration("duration", time.Since(startTime)))
	return shards
}

func (c *Client) UpdateMeasurement(db, rp, mst string, options *meta2.Options) error {
	_, err := c.Measurement(db, rp, mst)
	if err != nil {
		return err
	}
	cmd := &proto2.UpdateMeasurementCommand{
		Db:      proto.String(db),
		Rp:      proto.String(rp),
		Mst:     proto.String(mst),
		Options: options.Marshal(),
	}

	err = c.retryUntilExec(proto2.Command_UpdateMeasurementCommand, proto2.E_UpdateMeasurementCommand_Command, cmd)
	if err != nil {
		return err
	}

	return nil
}

// this function is used for UT testing
func (c *Client) SetCacheData(cacheData *meta2.Data) {
	c.cacheData = cacheData
}

func (c *Client) GetShardGroupByTimeRange(repoName, streamName string, min, max time.Time) ([]*meta2.ShardGroupInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	db, err := c.cacheData.GetDatabase(repoName)
	if err != nil {
		return nil, err
	}
	if db.MarkDeleted {
		return nil, nil
	}
	rp, err := db.GetRetentionPolicy(streamName)
	if err != nil {
		return nil, err
	}
	if rp.MarkDeleted {
		return nil, nil
	}
	sg := rp.ShardGroupsByTimeRange(min, max)

	return sg, nil
}

func refreshConnectedServer(currentServer int) {
	if currentServer != connectedServer {
		connectedServer = currentServer
	}
}

type Peers []string

func (peers Peers) Append(p ...string) Peers {
	peers = append(peers, p...)

	return peers.Unique()
}

func (peers Peers) Unique() Peers {
	distinct := map[string]struct{}{}
	for _, p := range peers {
		distinct[p] = struct{}{}
	}

	var u Peers
	for k := range distinct {
		u = append(u, k)
	}
	return u
}

func (peers Peers) Contains(peer string) bool {
	for _, p := range peers {
		if p == peer {
			return true
		}
	}
	return false
}

type ErrRedirect struct {
	Host string
}

func (e ErrRedirect) Error() string {
	return fmt.Sprintf("redirect to %s", e.Host)
}

type errCommand struct {
	msg string
}

func (e errCommand) Error() string {
	return e.msg
}

type uint64Slice []uint64

func (a uint64Slice) Len() int           { return len(a) }
func (a uint64Slice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
